session.rs

  1use crate::components::KernelListItem;
  2use crate::KernelStatus;
  3use crate::{
  4    kernels::{Kernel, KernelSpecification, RunningKernel},
  5    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  6};
  7use client::telemetry::Telemetry;
  8use collections::{HashMap, HashSet};
  9use editor::{
 10    display_map::{
 11        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 12        RenderBlock,
 13    },
 14    scroll::Autoscroll,
 15    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 16};
 17use futures::io::BufReader;
 18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
 19use gpui::{
 20    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 21    WeakView,
 22};
 23use language::Point;
 24use project::Fs;
 25use runtimelib::{
 26    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 27    ShutdownRequest,
 28};
 29use settings::Settings as _;
 30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 31use theme::{ActiveTheme, ThemeSettings};
 32use ui::{prelude::*, IconButtonShape, Tooltip};
 33
 34pub struct Session {
 35    editor: WeakView<Editor>,
 36    pub kernel: Kernel,
 37    blocks: HashMap<String, EditorBlock>,
 38    messaging_task: Task<()>,
 39    pub kernel_specification: KernelSpecification,
 40    telemetry: Arc<Telemetry>,
 41    _buffer_subscription: Subscription,
 42}
 43
 44struct EditorBlock {
 45    editor: WeakView<Editor>,
 46    code_range: Range<Anchor>,
 47    invalidation_anchor: Anchor,
 48    block_id: CustomBlockId,
 49    execution_view: View<ExecutionView>,
 50    on_close: CloseBlockFn,
 51}
 52
 53type CloseBlockFn =
 54    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 55
 56impl EditorBlock {
 57    fn new(
 58        editor: WeakView<Editor>,
 59        code_range: Range<Anchor>,
 60        status: ExecutionStatus,
 61        on_close: CloseBlockFn,
 62        cx: &mut ViewContext<Session>,
 63    ) -> anyhow::Result<Self> {
 64        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 65
 66        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 67            let buffer = editor.buffer().clone();
 68            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 69            let end_point = code_range.end.to_point(&buffer_snapshot);
 70            let next_row_start = end_point + Point::new(1, 0);
 71            if next_row_start > buffer_snapshot.max_point() {
 72                buffer.update(cx, |buffer, cx| {
 73                    buffer.edit(
 74                        [(
 75                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 76                            "\n",
 77                        )],
 78                        None,
 79                        cx,
 80                    )
 81                });
 82            }
 83
 84            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 85            let block = BlockProperties {
 86                position: code_range.end,
 87                height: execution_view.num_lines(cx).saturating_add(1),
 88                style: BlockStyle::Sticky,
 89                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 90                disposition: BlockDisposition::Below,
 91            };
 92
 93            let block_id = editor.insert_blocks([block], None, cx)[0];
 94            (block_id, invalidation_anchor)
 95        })?;
 96
 97        anyhow::Ok(Self {
 98            editor,
 99            code_range,
100            invalidation_anchor,
101            block_id,
102            execution_view,
103            on_close,
104        })
105    }
106
107    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
108        self.execution_view.update(cx, |execution_view, cx| {
109            execution_view.push_message(&message.content, cx);
110        });
111
112        self.editor
113            .update(cx, |editor, cx| {
114                let mut replacements = HashMap::default();
115
116                replacements.insert(
117                    self.block_id,
118                    (
119                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
120                        Self::create_output_area_renderer(
121                            self.execution_view.clone(),
122                            self.on_close.clone(),
123                        ),
124                    ),
125                );
126                editor.replace_blocks(replacements, None, cx);
127            })
128            .ok();
129    }
130
131    fn create_output_area_renderer(
132        execution_view: View<ExecutionView>,
133        on_close: CloseBlockFn,
134    ) -> RenderBlock {
135        let render = move |cx: &mut BlockContext| {
136            let execution_view = execution_view.clone();
137            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
138            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
139
140            let gutter = cx.gutter_dimensions;
141            let close_button_size = IconSize::XSmall;
142
143            let block_id = cx.block_id;
144            let on_close = on_close.clone();
145
146            let rem_size = cx.rem_size();
147            let line_height = cx.text_style().line_height_in_pixels(rem_size);
148
149            let (close_button_width, close_button_padding) =
150                close_button_size.square_components(cx);
151
152            div()
153                .min_h(line_height)
154                .flex()
155                .flex_row()
156                .items_start()
157                .w_full()
158                .bg(cx.theme().colors().background)
159                .border_y_1()
160                .border_color(cx.theme().colors().border)
161                .child(
162                    v_flex().min_h(cx.line_height()).justify_center().child(
163                        h_flex()
164                            .w(gutter.full_width())
165                            .justify_end()
166                            .pt(line_height / 2.)
167                            .child(
168                                h_flex()
169                                    .pr(gutter.width / 2. - close_button_width
170                                        + close_button_padding / 2.)
171                                    .child(
172                                        IconButton::new(
173                                            ("close_output_area", EntityId::from(cx.block_id)),
174                                            IconName::Close,
175                                        )
176                                        .shape(IconButtonShape::Square)
177                                        .icon_size(close_button_size)
178                                        .icon_color(Color::Muted)
179                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
180                                        .on_click(
181                                            move |_, cx| {
182                                                if let BlockId::Custom(block_id) = block_id {
183                                                    (on_close)(block_id, cx)
184                                                }
185                                            },
186                                        ),
187                                    ),
188                            ),
189                    ),
190                )
191                .child(
192                    div()
193                        .flex_1()
194                        .size_full()
195                        .my_2()
196                        .mr(gutter.width)
197                        .text_size(text_font_size)
198                        .font_family(text_font)
199                        .child(execution_view),
200                )
201                .into_any_element()
202        };
203
204        Box::new(render)
205    }
206}
207
208impl Session {
209    pub fn new(
210        editor: WeakView<Editor>,
211        fs: Arc<dyn Fs>,
212        telemetry: Arc<Telemetry>,
213        kernel_specification: KernelSpecification,
214        cx: &mut ViewContext<Self>,
215    ) -> Self {
216        let kernel_language = kernel_specification.kernelspec.language.clone();
217
218        telemetry.report_repl_event(
219            kernel_language.clone(),
220            KernelStatus::Starting.to_string(),
221            cx.entity_id().to_string(),
222        );
223
224        let entity_id = editor.entity_id();
225        let working_directory = editor
226            .upgrade()
227            .and_then(|editor| editor.read(cx).working_directory(cx))
228            .unwrap_or_else(temp_dir);
229        let kernel = RunningKernel::new(
230            kernel_specification.clone(),
231            entity_id,
232            working_directory,
233            fs.clone(),
234            cx,
235        );
236
237        let pending_kernel = cx
238            .spawn(|this, mut cx| async move {
239                let kernel = kernel.await;
240
241                match kernel {
242                    Ok((mut kernel, mut messages_rx)) => {
243                        this.update(&mut cx, |session, cx| {
244                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
245
246                            let stderr = kernel.process.stderr.take();
247
248                            cx.spawn(|_session, mut _cx| async move {
249                                if let None = stderr {
250                                    return;
251                                }
252                                let reader = BufReader::new(stderr.unwrap());
253                                let mut lines = reader.lines();
254                                while let Some(Ok(line)) = lines.next().await {
255                                    log::error!("kernel: {}", line);
256                                }
257                            })
258                            .detach();
259
260                            let stdout = kernel.process.stderr.take();
261
262                            cx.spawn(|_session, mut _cx| async move {
263                                if let None = stdout {
264                                    return;
265                                }
266                                let reader = BufReader::new(stdout.unwrap());
267                                let mut lines = reader.lines();
268                                while let Some(Ok(line)) = lines.next().await {
269                                    log::info!("kernel: {}", line);
270                                }
271                            })
272                            .detach();
273
274                            let status = kernel.process.status();
275                            session.kernel(Kernel::RunningKernel(kernel), cx);
276
277                            cx.spawn(|session, mut cx| async move {
278                                let error_message = match status.await {
279                                    Ok(status) => {
280                                        if status.success() {
281                                            log::info!("kernel process exited successfully");
282                                            return;
283                                        }
284
285                                        format!("kernel process exited with status: {:?}", status)
286                                    }
287                                    Err(err) => {
288                                        format!("kernel process exited with error: {:?}", err)
289                                    }
290                                };
291
292                                log::error!("{}", error_message);
293
294                                session
295                                    .update(&mut cx, |session, cx| {
296                                        session.kernel(
297                                            Kernel::ErroredLaunch(error_message.clone()),
298                                            cx,
299                                        );
300
301                                        session.blocks.values().for_each(|block| {
302                                            block.execution_view.update(
303                                                cx,
304                                                |execution_view, cx| {
305                                                    match execution_view.status {
306                                                        ExecutionStatus::Finished => {
307                                                            // Do nothing when the output was good
308                                                        }
309                                                        _ => {
310                                                            // All other cases, set the status to errored
311                                                            execution_view.status =
312                                                                ExecutionStatus::KernelErrored(
313                                                                    error_message.clone(),
314                                                                )
315                                                        }
316                                                    }
317                                                    cx.notify();
318                                                },
319                                            );
320                                        });
321
322                                        cx.notify();
323                                    })
324                                    .ok();
325                            })
326                            .detach();
327
328                            session.messaging_task = cx.spawn(|session, mut cx| async move {
329                                while let Some(message) = messages_rx.next().await {
330                                    session
331                                        .update(&mut cx, |session, cx| {
332                                            session.route(&message, cx);
333                                        })
334                                        .ok();
335                                }
336                            });
337
338                            // todo!(kyle): send kernelinforequest once our shell channel read/writes are split
339                            // cx.spawn(|this, mut cx| async move {
340                            //     cx.background_executor()
341                            //         .timer(Duration::from_millis(120))
342                            //         .await;
343                            //     this.update(&mut cx, |this, cx| {
344                            //         this.send(KernelInfoRequest {}.into(), cx).ok();
345                            //     })
346                            //     .ok();
347                            // })
348                            // .detach();
349                        })
350                        .ok();
351                    }
352                    Err(err) => {
353                        this.update(&mut cx, |session, cx| {
354                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
355                        })
356                        .ok();
357                    }
358                }
359            })
360            .shared();
361
362        let subscription = match editor.upgrade() {
363            Some(editor) => {
364                let buffer = editor.read(cx).buffer().clone();
365                cx.subscribe(&buffer, Self::on_buffer_event)
366            }
367            None => Subscription::new(|| {}),
368        };
369
370        return Self {
371            editor,
372            kernel: Kernel::StartingKernel(pending_kernel),
373            messaging_task: Task::ready(()),
374            blocks: HashMap::default(),
375            kernel_specification,
376            _buffer_subscription: subscription,
377            telemetry,
378        };
379    }
380
381    fn on_buffer_event(
382        &mut self,
383        buffer: Model<MultiBuffer>,
384        event: &multi_buffer::Event,
385        cx: &mut ViewContext<Self>,
386    ) {
387        if let multi_buffer::Event::Edited { .. } = event {
388            let snapshot = buffer.read(cx).snapshot(cx);
389
390            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
391
392            self.blocks.retain(|_id, block| {
393                if block.invalidation_anchor.is_valid(&snapshot) {
394                    true
395                } else {
396                    blocks_to_remove.insert(block.block_id);
397                    false
398                }
399            });
400
401            if !blocks_to_remove.is_empty() {
402                self.editor
403                    .update(cx, |editor, cx| {
404                        editor.remove_blocks(blocks_to_remove, None, cx);
405                    })
406                    .ok();
407                cx.notify();
408            }
409        }
410    }
411
412    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
413        match &mut self.kernel {
414            Kernel::RunningKernel(kernel) => {
415                kernel.request_tx.try_send(message).ok();
416            }
417            _ => {}
418        }
419
420        anyhow::Ok(())
421    }
422
423    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
424        let blocks_to_remove: HashSet<CustomBlockId> =
425            self.blocks.values().map(|block| block.block_id).collect();
426
427        self.editor
428            .update(cx, |editor, cx| {
429                editor.remove_blocks(blocks_to_remove, None, cx);
430            })
431            .ok();
432
433        self.blocks.clear();
434    }
435
436    pub fn execute(
437        &mut self,
438        code: String,
439        anchor_range: Range<Anchor>,
440        next_cell: Option<Anchor>,
441        cx: &mut ViewContext<Self>,
442    ) {
443        let Some(editor) = self.editor.upgrade() else {
444            return;
445        };
446
447        if code.is_empty() {
448            return;
449        }
450
451        let execute_request = ExecuteRequest {
452            code,
453            ..ExecuteRequest::default()
454        };
455
456        let message: JupyterMessage = execute_request.into();
457
458        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
459
460        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
461
462        self.blocks.retain(|_key, block| {
463            if anchor_range.overlaps(&block.code_range, &buffer) {
464                blocks_to_remove.insert(block.block_id);
465                false
466            } else {
467                true
468            }
469        });
470
471        self.editor
472            .update(cx, |editor, cx| {
473                editor.remove_blocks(blocks_to_remove, None, cx);
474            })
475            .ok();
476
477        let status = match &self.kernel {
478            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
479            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
480            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
481            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
482            Kernel::Shutdown => ExecutionStatus::Shutdown,
483        };
484
485        let parent_message_id = message.header.msg_id.clone();
486        let session_view = cx.view().downgrade();
487        let weak_editor = self.editor.clone();
488
489        let on_close: CloseBlockFn =
490            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
491                if let Some(session) = session_view.upgrade() {
492                    session.update(cx, |session, cx| {
493                        session.blocks.remove(&parent_message_id);
494                        cx.notify();
495                    });
496                }
497
498                if let Some(editor) = weak_editor.upgrade() {
499                    editor.update(cx, |editor, cx| {
500                        let mut block_ids = HashSet::default();
501                        block_ids.insert(block_id);
502                        editor.remove_blocks(block_ids, None, cx);
503                    });
504                }
505            });
506
507        let Ok(editor_block) =
508            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
509        else {
510            return;
511        };
512
513        let new_cursor_pos = if let Some(next_cursor) = next_cell {
514            next_cursor
515        } else {
516            editor_block.invalidation_anchor
517        };
518
519        self.blocks
520            .insert(message.header.msg_id.clone(), editor_block);
521
522        match &self.kernel {
523            Kernel::RunningKernel(_) => {
524                self.send(message, cx).ok();
525            }
526            Kernel::StartingKernel(task) => {
527                // Queue up the execution as a task to run after the kernel starts
528                let task = task.clone();
529                let message = message.clone();
530
531                cx.spawn(|this, mut cx| async move {
532                    task.await;
533                    this.update(&mut cx, |session, cx| {
534                        session.send(message, cx).ok();
535                    })
536                    .ok();
537                })
538                .detach();
539            }
540            _ => {}
541        }
542
543        // Now move the cursor to after the block
544        editor.update(cx, move |editor, cx| {
545            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
546                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
547            });
548        });
549    }
550
551    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
552        let parent_message_id = match message.parent_header.as_ref() {
553            Some(header) => &header.msg_id,
554            None => return,
555        };
556
557        match &message.content {
558            JupyterMessageContent::Status(status) => {
559                self.kernel.set_execution_state(&status.execution_state);
560
561                self.telemetry.report_repl_event(
562                    self.kernel_specification.kernelspec.language.clone(),
563                    KernelStatus::from(&self.kernel).to_string(),
564                    cx.entity_id().to_string(),
565                );
566
567                cx.notify();
568            }
569            JupyterMessageContent::KernelInfoReply(reply) => {
570                self.kernel.set_kernel_info(&reply);
571                cx.notify();
572            }
573            _ => {}
574        }
575
576        if let Some(block) = self.blocks.get_mut(parent_message_id) {
577            block.handle_message(&message, cx);
578            return;
579        }
580    }
581
582    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
583        match &mut self.kernel {
584            Kernel::RunningKernel(_kernel) => {
585                self.send(InterruptRequest {}.into(), cx).ok();
586            }
587            Kernel::StartingKernel(_task) => {
588                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
589            }
590            _ => {}
591        }
592    }
593
594    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
595        if let Kernel::Shutdown = kernel {
596            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
597        }
598
599        let kernel_status = KernelStatus::from(&kernel).to_string();
600        let kernel_language = self.kernel_specification.kernelspec.language.clone();
601
602        self.telemetry.report_repl_event(
603            kernel_language,
604            kernel_status,
605            cx.entity_id().to_string(),
606        );
607
608        self.kernel = kernel;
609    }
610
611    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
612        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
613
614        match kernel {
615            Kernel::RunningKernel(mut kernel) => {
616                let mut request_tx = kernel.request_tx.clone();
617
618                cx.spawn(|this, mut cx| async move {
619                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
620                    request_tx.try_send(message).ok();
621
622                    // Give the kernel a bit of time to clean up
623                    cx.background_executor().timer(Duration::from_secs(3)).await;
624
625                    kernel.process.kill().ok();
626
627                    this.update(&mut cx, |session, cx| {
628                        session.clear_outputs(cx);
629                        session.kernel(Kernel::Shutdown, cx);
630                        cx.notify();
631                    })
632                    .ok();
633                })
634                .detach();
635            }
636            Kernel::StartingKernel(_kernel) => {
637                self.kernel = Kernel::Shutdown;
638            }
639            _ => {
640                self.kernel = Kernel::Shutdown;
641            }
642        }
643        cx.notify();
644    }
645}
646
647pub enum SessionEvent {
648    Shutdown(WeakView<Editor>),
649}
650
651impl EventEmitter<SessionEvent> for Session {}
652
653impl Render for Session {
654    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
655        let (status_text, interrupt_button) = match &self.kernel {
656            Kernel::RunningKernel(kernel) => (
657                kernel
658                    .kernel_info
659                    .as_ref()
660                    .map(|info| info.language_info.name.clone()),
661                Some(
662                    Button::new("interrupt", "Interrupt")
663                        .style(ButtonStyle::Subtle)
664                        .on_click(cx.listener(move |session, _, cx| {
665                            session.interrupt(cx);
666                        })),
667                ),
668            ),
669            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
670            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
671            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
672            Kernel::Shutdown => (Some("Shutdown".into()), None),
673        };
674
675        KernelListItem::new(self.kernel_specification.clone())
676            .status_color(match &self.kernel {
677                Kernel::RunningKernel(kernel) => match kernel.execution_state {
678                    ExecutionState::Idle => Color::Success,
679                    ExecutionState::Busy => Color::Modified,
680                },
681                Kernel::StartingKernel(_) => Color::Modified,
682                Kernel::ErroredLaunch(_) => Color::Error,
683                Kernel::ShuttingDown => Color::Modified,
684                Kernel::Shutdown => Color::Disabled,
685            })
686            .child(Label::new(self.kernel_specification.name.clone()))
687            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
688            .button(
689                Button::new("shutdown", "Shutdown")
690                    .style(ButtonStyle::Subtle)
691                    .disabled(self.kernel.is_shutting_down())
692                    .on_click(cx.listener(move |session, _, cx| {
693                        session.shutdown(cx);
694                    })),
695            )
696            .buttons(interrupt_button)
697    }
698}