session.rs

  1use crate::components::KernelListItem;
  2use crate::{
  3    kernels::{Kernel, KernelSpecification, RunningKernel},
  4    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  5};
  6use collections::{HashMap, HashSet};
  7use editor::{
  8    display_map::{
  9        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 10        RenderBlock,
 11    },
 12    scroll::Autoscroll,
 13    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 14};
 15use futures::{FutureExt as _, StreamExt as _};
 16use gpui::{
 17    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 18    WeakView,
 19};
 20use language::Point;
 21use project::Fs;
 22use runtimelib::{
 23    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 24    ShutdownRequest,
 25};
 26use settings::Settings as _;
 27use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 28use theme::{ActiveTheme, ThemeSettings};
 29use ui::{prelude::*, IconButtonShape, Tooltip};
 30
 31pub struct Session {
 32    editor: WeakView<Editor>,
 33    pub kernel: Kernel,
 34    blocks: HashMap<String, EditorBlock>,
 35    messaging_task: Task<()>,
 36    pub kernel_specification: KernelSpecification,
 37    _buffer_subscription: Subscription,
 38}
 39
 40struct EditorBlock {
 41    editor: WeakView<Editor>,
 42    code_range: Range<Anchor>,
 43    invalidation_anchor: Anchor,
 44    block_id: CustomBlockId,
 45    execution_view: View<ExecutionView>,
 46    on_close: CloseBlockFn,
 47}
 48
 49type CloseBlockFn =
 50    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 51
 52impl EditorBlock {
 53    fn new(
 54        editor: WeakView<Editor>,
 55        code_range: Range<Anchor>,
 56        status: ExecutionStatus,
 57        on_close: CloseBlockFn,
 58        cx: &mut ViewContext<Session>,
 59    ) -> anyhow::Result<Self> {
 60        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 61
 62        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 63            let buffer = editor.buffer().clone();
 64            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 65            let end_point = code_range.end.to_point(&buffer_snapshot);
 66            let next_row_start = end_point + Point::new(1, 0);
 67            if next_row_start > buffer_snapshot.max_point() {
 68                buffer.update(cx, |buffer, cx| {
 69                    buffer.edit(
 70                        [(
 71                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 72                            "\n",
 73                        )],
 74                        None,
 75                        cx,
 76                    )
 77                });
 78            }
 79
 80            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 81            let block = BlockProperties {
 82                position: code_range.end,
 83                height: execution_view.num_lines(cx).saturating_add(1),
 84                style: BlockStyle::Sticky,
 85                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 86                disposition: BlockDisposition::Below,
 87            };
 88
 89            let block_id = editor.insert_blocks([block], None, cx)[0];
 90            (block_id, invalidation_anchor)
 91        })?;
 92
 93        anyhow::Ok(Self {
 94            editor,
 95            code_range,
 96            invalidation_anchor,
 97            block_id,
 98            execution_view,
 99            on_close,
100        })
101    }
102
103    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
104        self.execution_view.update(cx, |execution_view, cx| {
105            execution_view.push_message(&message.content, cx);
106        });
107
108        self.editor
109            .update(cx, |editor, cx| {
110                let mut replacements = HashMap::default();
111
112                replacements.insert(
113                    self.block_id,
114                    (
115                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
116                        Self::create_output_area_renderer(
117                            self.execution_view.clone(),
118                            self.on_close.clone(),
119                        ),
120                    ),
121                );
122                editor.replace_blocks(replacements, None, cx);
123            })
124            .ok();
125    }
126
127    fn create_output_area_renderer(
128        execution_view: View<ExecutionView>,
129        on_close: CloseBlockFn,
130    ) -> RenderBlock {
131        let render = move |cx: &mut BlockContext| {
132            let execution_view = execution_view.clone();
133            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
134            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
135
136            let gutter = cx.gutter_dimensions;
137            let close_button_size = IconSize::XSmall;
138
139            let block_id = cx.block_id;
140            let on_close = on_close.clone();
141
142            let rem_size = cx.rem_size();
143            let line_height = cx.text_style().line_height_in_pixels(rem_size);
144
145            let (close_button_width, close_button_padding) =
146                close_button_size.square_components(cx);
147
148            div()
149                .min_h(line_height)
150                .flex()
151                .flex_row()
152                .items_start()
153                .w_full()
154                .bg(cx.theme().colors().background)
155                .border_y_1()
156                .border_color(cx.theme().colors().border)
157                .child(
158                    v_flex().min_h(cx.line_height()).justify_center().child(
159                        h_flex()
160                            .w(gutter.full_width())
161                            .justify_end()
162                            .pt(line_height / 2.)
163                            .child(
164                                h_flex()
165                                    .pr(gutter.width / 2. - close_button_width
166                                        + close_button_padding / 2.)
167                                    .child(
168                                        IconButton::new(
169                                            ("close_output_area", EntityId::from(cx.block_id)),
170                                            IconName::Close,
171                                        )
172                                        .shape(IconButtonShape::Square)
173                                        .icon_size(close_button_size)
174                                        .icon_color(Color::Muted)
175                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
176                                        .on_click(
177                                            move |_, cx| {
178                                                if let BlockId::Custom(block_id) = block_id {
179                                                    (on_close)(block_id, cx)
180                                                }
181                                            },
182                                        ),
183                                    ),
184                            ),
185                    ),
186                )
187                .child(
188                    div()
189                        .flex_1()
190                        .size_full()
191                        .my_2()
192                        .mr(gutter.width)
193                        .text_size(text_font_size)
194                        .font_family(text_font)
195                        .child(execution_view),
196                )
197                .into_any_element()
198        };
199
200        Box::new(render)
201    }
202}
203
204impl Session {
205    pub fn new(
206        editor: WeakView<Editor>,
207        fs: Arc<dyn Fs>,
208        kernel_specification: KernelSpecification,
209        cx: &mut ViewContext<Self>,
210    ) -> Self {
211        let entity_id = editor.entity_id();
212        let working_directory = editor
213            .upgrade()
214            .and_then(|editor| editor.read(cx).working_directory(cx))
215            .unwrap_or_else(temp_dir);
216        let kernel = RunningKernel::new(
217            kernel_specification.clone(),
218            entity_id,
219            working_directory,
220            fs.clone(),
221            cx,
222        );
223
224        let pending_kernel = cx
225            .spawn(|this, mut cx| async move {
226                let kernel = kernel.await;
227
228                match kernel {
229                    Ok((mut kernel, mut messages_rx)) => {
230                        this.update(&mut cx, |this, cx| {
231                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
232
233                            let status = kernel.process.status();
234                            this.kernel = Kernel::RunningKernel(kernel);
235
236                            cx.spawn(|session, mut cx| async move {
237                                let error_message = match status.await {
238                                    Ok(status) => {
239                                        if status.success() {
240                                            log::info!("kernel process exited successfully");
241                                            return;
242                                        }
243
244                                        format!("kernel process exited with status: {:?}", status)
245                                    }
246                                    Err(err) => {
247                                        format!("kernel process exited with error: {:?}", err)
248                                    }
249                                };
250
251                                log::error!("{}", error_message);
252
253                                session
254                                    .update(&mut cx, |session, cx| {
255                                        session.kernel =
256                                            Kernel::ErroredLaunch(error_message.clone());
257
258                                        session.blocks.values().for_each(|block| {
259                                            block.execution_view.update(
260                                                cx,
261                                                |execution_view, cx| {
262                                                    match execution_view.status {
263                                                        ExecutionStatus::Finished => {
264                                                            // Do nothing when the output was good
265                                                        }
266                                                        _ => {
267                                                            // All other cases, set the status to errored
268                                                            execution_view.status =
269                                                                ExecutionStatus::KernelErrored(
270                                                                    error_message.clone(),
271                                                                )
272                                                        }
273                                                    }
274                                                    cx.notify();
275                                                },
276                                            );
277                                        });
278
279                                        cx.notify();
280                                    })
281                                    .ok();
282                            })
283                            .detach();
284
285                            this.messaging_task = cx.spawn(|session, mut cx| async move {
286                                while let Some(message) = messages_rx.next().await {
287                                    session
288                                        .update(&mut cx, |session, cx| {
289                                            session.route(&message, cx);
290                                        })
291                                        .ok();
292                                }
293                            });
294                        })
295                        .ok();
296                    }
297                    Err(err) => {
298                        this.update(&mut cx, |this, _cx| {
299                            this.kernel = Kernel::ErroredLaunch(err.to_string());
300                        })
301                        .ok();
302                    }
303                }
304            })
305            .shared();
306
307        let subscription = match editor.upgrade() {
308            Some(editor) => {
309                let buffer = editor.read(cx).buffer().clone();
310                cx.subscribe(&buffer, Self::on_buffer_event)
311            }
312            None => Subscription::new(|| {}),
313        };
314
315        return Self {
316            editor,
317            kernel: Kernel::StartingKernel(pending_kernel),
318            messaging_task: Task::ready(()),
319            blocks: HashMap::default(),
320            kernel_specification,
321            _buffer_subscription: subscription,
322        };
323    }
324
325    fn on_buffer_event(
326        &mut self,
327        buffer: Model<MultiBuffer>,
328        event: &multi_buffer::Event,
329        cx: &mut ViewContext<Self>,
330    ) {
331        if let multi_buffer::Event::Edited { .. } = event {
332            let snapshot = buffer.read(cx).snapshot(cx);
333
334            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
335
336            self.blocks.retain(|_id, block| {
337                if block.invalidation_anchor.is_valid(&snapshot) {
338                    true
339                } else {
340                    blocks_to_remove.insert(block.block_id);
341                    false
342                }
343            });
344
345            if !blocks_to_remove.is_empty() {
346                self.editor
347                    .update(cx, |editor, cx| {
348                        editor.remove_blocks(blocks_to_remove, None, cx);
349                    })
350                    .ok();
351                cx.notify();
352            }
353        }
354    }
355
356    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
357        match &mut self.kernel {
358            Kernel::RunningKernel(kernel) => {
359                kernel.request_tx.try_send(message).ok();
360            }
361            _ => {}
362        }
363
364        anyhow::Ok(())
365    }
366
367    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
368        let blocks_to_remove: HashSet<CustomBlockId> =
369            self.blocks.values().map(|block| block.block_id).collect();
370
371        self.editor
372            .update(cx, |editor, cx| {
373                editor.remove_blocks(blocks_to_remove, None, cx);
374            })
375            .ok();
376
377        self.blocks.clear();
378    }
379
380    pub fn execute(
381        &mut self,
382        code: String,
383        anchor_range: Range<Anchor>,
384        next_cell: Option<Anchor>,
385        cx: &mut ViewContext<Self>,
386    ) {
387        let Some(editor) = self.editor.upgrade() else {
388            return;
389        };
390
391        if code.is_empty() {
392            return;
393        }
394
395        let execute_request = ExecuteRequest {
396            code,
397            ..ExecuteRequest::default()
398        };
399
400        let message: JupyterMessage = execute_request.into();
401
402        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
403
404        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
405
406        self.blocks.retain(|_key, block| {
407            if anchor_range.overlaps(&block.code_range, &buffer) {
408                blocks_to_remove.insert(block.block_id);
409                false
410            } else {
411                true
412            }
413        });
414
415        self.editor
416            .update(cx, |editor, cx| {
417                editor.remove_blocks(blocks_to_remove, None, cx);
418            })
419            .ok();
420
421        let status = match &self.kernel {
422            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
423            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
424            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
425            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
426            Kernel::Shutdown => ExecutionStatus::Shutdown,
427        };
428
429        let parent_message_id = message.header.msg_id.clone();
430        let session_view = cx.view().downgrade();
431        let weak_editor = self.editor.clone();
432
433        let on_close: CloseBlockFn =
434            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
435                if let Some(session) = session_view.upgrade() {
436                    session.update(cx, |session, cx| {
437                        session.blocks.remove(&parent_message_id);
438                        cx.notify();
439                    });
440                }
441
442                if let Some(editor) = weak_editor.upgrade() {
443                    editor.update(cx, |editor, cx| {
444                        let mut block_ids = HashSet::default();
445                        block_ids.insert(block_id);
446                        editor.remove_blocks(block_ids, None, cx);
447                    });
448                }
449            });
450
451        let Ok(editor_block) =
452            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
453        else {
454            return;
455        };
456
457        let new_cursor_pos = if let Some(next_cursor) = next_cell {
458            next_cursor
459        } else {
460            editor_block.invalidation_anchor
461        };
462
463        self.blocks
464            .insert(message.header.msg_id.clone(), editor_block);
465
466        match &self.kernel {
467            Kernel::RunningKernel(_) => {
468                self.send(message, cx).ok();
469            }
470            Kernel::StartingKernel(task) => {
471                // Queue up the execution as a task to run after the kernel starts
472                let task = task.clone();
473                let message = message.clone();
474
475                cx.spawn(|this, mut cx| async move {
476                    task.await;
477                    this.update(&mut cx, |this, cx| {
478                        this.send(message, cx).ok();
479                    })
480                    .ok();
481                })
482                .detach();
483            }
484            _ => {}
485        }
486
487        // Now move the cursor to after the block
488        editor.update(cx, move |editor, cx| {
489            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
490                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
491            });
492        });
493    }
494
495    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
496        let parent_message_id = match message.parent_header.as_ref() {
497            Some(header) => &header.msg_id,
498            None => return,
499        };
500
501        match &message.content {
502            JupyterMessageContent::Status(status) => {
503                self.kernel.set_execution_state(&status.execution_state);
504                cx.notify();
505            }
506            JupyterMessageContent::KernelInfoReply(reply) => {
507                self.kernel.set_kernel_info(&reply);
508                cx.notify();
509            }
510            _ => {}
511        }
512
513        if let Some(block) = self.blocks.get_mut(parent_message_id) {
514            block.handle_message(&message, cx);
515            return;
516        }
517    }
518
519    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
520        match &mut self.kernel {
521            Kernel::RunningKernel(_kernel) => {
522                self.send(InterruptRequest {}.into(), cx).ok();
523            }
524            Kernel::StartingKernel(_task) => {
525                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
526            }
527            _ => {}
528        }
529    }
530
531    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
532        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
533
534        match kernel {
535            Kernel::RunningKernel(mut kernel) => {
536                let mut request_tx = kernel.request_tx.clone();
537
538                cx.spawn(|this, mut cx| async move {
539                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
540                    request_tx.try_send(message).ok();
541
542                    // Give the kernel a bit of time to clean up
543                    cx.background_executor().timer(Duration::from_secs(3)).await;
544
545                    kernel.process.kill().ok();
546
547                    this.update(&mut cx, |this, cx| {
548                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
549                        this.clear_outputs(cx);
550                        this.kernel = Kernel::Shutdown;
551                        cx.notify();
552                    })
553                    .ok();
554                })
555                .detach();
556            }
557            Kernel::StartingKernel(_kernel) => {
558                self.kernel = Kernel::Shutdown;
559            }
560            _ => {
561                self.kernel = Kernel::Shutdown;
562            }
563        }
564        cx.notify();
565    }
566}
567
568pub enum SessionEvent {
569    Shutdown(WeakView<Editor>),
570}
571
572impl EventEmitter<SessionEvent> for Session {}
573
574impl Render for Session {
575    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
576        let (status_text, interrupt_button) = match &self.kernel {
577            Kernel::RunningKernel(kernel) => (
578                kernel
579                    .kernel_info
580                    .as_ref()
581                    .map(|info| info.language_info.name.clone()),
582                Some(
583                    Button::new("interrupt", "Interrupt")
584                        .style(ButtonStyle::Subtle)
585                        .on_click(cx.listener(move |session, _, cx| {
586                            session.interrupt(cx);
587                        })),
588                ),
589            ),
590            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
591            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
592            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
593            Kernel::Shutdown => (Some("Shutdown".into()), None),
594        };
595
596        KernelListItem::new(self.kernel_specification.clone())
597            .status_color(match &self.kernel {
598                Kernel::RunningKernel(kernel) => match kernel.execution_state {
599                    ExecutionState::Idle => Color::Success,
600                    ExecutionState::Busy => Color::Modified,
601                },
602                Kernel::StartingKernel(_) => Color::Modified,
603                Kernel::ErroredLaunch(_) => Color::Error,
604                Kernel::ShuttingDown => Color::Modified,
605                Kernel::Shutdown => Color::Disabled,
606            })
607            .child(Label::new(self.kernel_specification.name.clone()))
608            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
609            .button(
610                Button::new("shutdown", "Shutdown")
611                    .style(ButtonStyle::Subtle)
612                    .disabled(self.kernel.is_shutting_down())
613                    .on_click(cx.listener(move |session, _, cx| {
614                        session.shutdown(cx);
615                    })),
616            )
617            .buttons(interrupt_button)
618    }
619}