session.rs

  1use crate::components::KernelListItem;
  2use crate::{
  3    kernels::{Kernel, KernelSpecification, RunningKernel},
  4    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  5};
  6use collections::{HashMap, HashSet};
  7use editor::{
  8    display_map::{
  9        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 10        RenderBlock,
 11    },
 12    scroll::Autoscroll,
 13    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 14};
 15use futures::{FutureExt as _, StreamExt as _};
 16use gpui::{
 17    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 18    WeakView,
 19};
 20use language::Point;
 21use project::Fs;
 22use runtimelib::{
 23    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 24    ShutdownRequest,
 25};
 26use settings::Settings as _;
 27use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 28use theme::{ActiveTheme, ThemeSettings};
 29use ui::{prelude::*, IconButtonShape, Tooltip};
 30
 31pub struct Session {
 32    editor: WeakView<Editor>,
 33    pub kernel: Kernel,
 34    blocks: HashMap<String, EditorBlock>,
 35    messaging_task: Task<()>,
 36    pub kernel_specification: KernelSpecification,
 37    _buffer_subscription: Subscription,
 38}
 39
 40struct EditorBlock {
 41    editor: WeakView<Editor>,
 42    code_range: Range<Anchor>,
 43    invalidation_anchor: Anchor,
 44    block_id: CustomBlockId,
 45    execution_view: View<ExecutionView>,
 46    on_close: CloseBlockFn,
 47}
 48
 49type CloseBlockFn =
 50    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 51
 52impl EditorBlock {
 53    fn new(
 54        editor: WeakView<Editor>,
 55        code_range: Range<Anchor>,
 56        status: ExecutionStatus,
 57        on_close: CloseBlockFn,
 58        cx: &mut ViewContext<Session>,
 59    ) -> anyhow::Result<Self> {
 60        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 61
 62        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 63            let buffer = editor.buffer().clone();
 64            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 65            let end_point = code_range.end.to_point(&buffer_snapshot);
 66            let next_row_start = end_point + Point::new(1, 0);
 67            if next_row_start > buffer_snapshot.max_point() {
 68                buffer.update(cx, |buffer, cx| {
 69                    buffer.edit(
 70                        [(
 71                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 72                            "\n",
 73                        )],
 74                        None,
 75                        cx,
 76                    )
 77                });
 78            }
 79
 80            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 81            let block = BlockProperties {
 82                position: code_range.end,
 83                height: execution_view.num_lines(cx).saturating_add(1),
 84                style: BlockStyle::Sticky,
 85                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 86                disposition: BlockDisposition::Below,
 87            };
 88
 89            let block_id = editor.insert_blocks([block], None, cx)[0];
 90            (block_id, invalidation_anchor)
 91        })?;
 92
 93        anyhow::Ok(Self {
 94            editor,
 95            code_range,
 96            invalidation_anchor,
 97            block_id,
 98            execution_view,
 99            on_close,
100        })
101    }
102
103    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
104        self.execution_view.update(cx, |execution_view, cx| {
105            execution_view.push_message(&message.content, cx);
106        });
107
108        self.editor
109            .update(cx, |editor, cx| {
110                let mut replacements = HashMap::default();
111
112                replacements.insert(
113                    self.block_id,
114                    (
115                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
116                        Self::create_output_area_renderer(
117                            self.execution_view.clone(),
118                            self.on_close.clone(),
119                        ),
120                    ),
121                );
122                editor.replace_blocks(replacements, None, cx);
123            })
124            .ok();
125    }
126
127    fn create_output_area_renderer(
128        execution_view: View<ExecutionView>,
129        on_close: CloseBlockFn,
130    ) -> RenderBlock {
131        let render = move |cx: &mut BlockContext| {
132            let execution_view = execution_view.clone();
133            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
134            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
135
136            let gutter = cx.gutter_dimensions;
137            let close_button_size = IconSize::XSmall;
138
139            let block_id = cx.block_id;
140            let on_close = on_close.clone();
141
142            let rem_size = cx.rem_size();
143            let line_height = cx.text_style().line_height_in_pixels(rem_size);
144
145            let (close_button_width, close_button_padding) =
146                close_button_size.square_components(cx);
147
148            div()
149                .min_h(line_height)
150                .flex()
151                .flex_row()
152                .items_start()
153                .w_full()
154                .bg(cx.theme().colors().background)
155                .border_y_1()
156                .border_color(cx.theme().colors().border)
157                .child(
158                    v_flex().min_h(cx.line_height()).justify_center().child(
159                        h_flex()
160                            .w(gutter.full_width())
161                            .justify_end()
162                            .pt(line_height / 2.)
163                            .child(
164                                h_flex()
165                                    .pr(gutter.width / 2. - close_button_width
166                                        + close_button_padding / 2.)
167                                    .child(
168                                        IconButton::new(
169                                            ("close_output_area", EntityId::from(cx.block_id)),
170                                            IconName::Close,
171                                        )
172                                        .shape(IconButtonShape::Square)
173                                        .icon_size(close_button_size)
174                                        .icon_color(Color::Muted)
175                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
176                                        .on_click(
177                                            move |_, cx| {
178                                                if let BlockId::Custom(block_id) = block_id {
179                                                    (on_close)(block_id, cx)
180                                                }
181                                            },
182                                        ),
183                                    ),
184                            ),
185                    ),
186                )
187                .child(
188                    div()
189                        .flex_1()
190                        .size_full()
191                        .my_2()
192                        .mr(gutter.width)
193                        .text_size(text_font_size)
194                        .font_family(text_font)
195                        .child(execution_view),
196                )
197                .into_any_element()
198        };
199
200        Box::new(render)
201    }
202}
203
204impl Session {
205    pub fn new(
206        editor: WeakView<Editor>,
207        fs: Arc<dyn Fs>,
208        kernel_specification: KernelSpecification,
209        cx: &mut ViewContext<Self>,
210    ) -> Self {
211        let entity_id = editor.entity_id();
212        let working_directory = editor
213            .upgrade()
214            .and_then(|editor| editor.read(cx).working_directory(cx))
215            .unwrap_or_else(temp_dir);
216        let kernel = RunningKernel::new(
217            kernel_specification.clone(),
218            entity_id,
219            working_directory,
220            fs.clone(),
221            cx,
222        );
223
224        let pending_kernel = cx
225            .spawn(|this, mut cx| async move {
226                let kernel = kernel.await;
227
228                match kernel {
229                    Ok((mut kernel, mut messages_rx)) => {
230                        this.update(&mut cx, |this, cx| {
231                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
232
233                            let status = kernel.process.status();
234                            this.kernel = Kernel::RunningKernel(kernel);
235
236                            cx.spawn(|session, mut cx| async move {
237                                let error_message = match status.await {
238                                    Ok(status) => {
239                                        if status.success() {
240                                            log::info!("kernel process exited successfully");
241                                            return;
242                                        }
243
244                                        format!("kernel process exited with status: {:?}", status)
245                                    }
246                                    Err(err) => {
247                                        format!("kernel process exited with error: {:?}", err)
248                                    }
249                                };
250
251                                log::error!("{}", error_message);
252
253                                session
254                                    .update(&mut cx, |session, cx| {
255                                        session.kernel =
256                                            Kernel::ErroredLaunch(error_message.clone());
257
258                                        session.blocks.values().for_each(|block| {
259                                            block.execution_view.update(
260                                                cx,
261                                                |execution_view, cx| {
262                                                    match execution_view.status {
263                                                        ExecutionStatus::Finished => {
264                                                            // Do nothing when the output was good
265                                                        }
266                                                        _ => {
267                                                            // All other cases, set the status to errored
268                                                            execution_view.status =
269                                                                ExecutionStatus::KernelErrored(
270                                                                    error_message.clone(),
271                                                                )
272                                                        }
273                                                    }
274                                                    cx.notify();
275                                                },
276                                            );
277                                        });
278
279                                        cx.notify();
280                                    })
281                                    .ok();
282                            })
283                            .detach();
284
285                            this.messaging_task = cx.spawn(|session, mut cx| async move {
286                                while let Some(message) = messages_rx.next().await {
287                                    session
288                                        .update(&mut cx, |session, cx| {
289                                            session.route(&message, cx);
290                                        })
291                                        .ok();
292                                }
293                            });
294                        })
295                        .ok();
296                    }
297                    Err(err) => {
298                        this.update(&mut cx, |this, _cx| {
299                            this.kernel = Kernel::ErroredLaunch(err.to_string());
300                        })
301                        .ok();
302                    }
303                }
304            })
305            .shared();
306
307        let subscription = match editor.upgrade() {
308            Some(editor) => {
309                let buffer = editor.read(cx).buffer().clone();
310                cx.subscribe(&buffer, Self::on_buffer_event)
311            }
312            None => Subscription::new(|| {}),
313        };
314
315        return Self {
316            editor,
317            kernel: Kernel::StartingKernel(pending_kernel),
318            messaging_task: Task::ready(()),
319            blocks: HashMap::default(),
320            kernel_specification,
321            _buffer_subscription: subscription,
322        };
323    }
324
325    fn on_buffer_event(
326        &mut self,
327        buffer: Model<MultiBuffer>,
328        event: &multi_buffer::Event,
329        cx: &mut ViewContext<Self>,
330    ) {
331        if let multi_buffer::Event::Edited { .. } = event {
332            let snapshot = buffer.read(cx).snapshot(cx);
333
334            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
335
336            self.blocks.retain(|_id, block| {
337                if block.invalidation_anchor.is_valid(&snapshot) {
338                    true
339                } else {
340                    blocks_to_remove.insert(block.block_id);
341                    false
342                }
343            });
344
345            if !blocks_to_remove.is_empty() {
346                self.editor
347                    .update(cx, |editor, cx| {
348                        editor.remove_blocks(blocks_to_remove, None, cx);
349                    })
350                    .ok();
351                cx.notify();
352            }
353        }
354    }
355
356    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
357        match &mut self.kernel {
358            Kernel::RunningKernel(kernel) => {
359                kernel.request_tx.try_send(message).ok();
360            }
361            _ => {}
362        }
363
364        anyhow::Ok(())
365    }
366
367    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
368        let blocks_to_remove: HashSet<CustomBlockId> =
369            self.blocks.values().map(|block| block.block_id).collect();
370
371        self.editor
372            .update(cx, |editor, cx| {
373                editor.remove_blocks(blocks_to_remove, None, cx);
374            })
375            .ok();
376
377        self.blocks.clear();
378    }
379
380    pub fn execute(
381        &mut self,
382        code: String,
383        anchor_range: Range<Anchor>,
384        cx: &mut ViewContext<Self>,
385    ) {
386        let Some(editor) = self.editor.upgrade() else {
387            return;
388        };
389
390        if code.is_empty() {
391            return;
392        }
393
394        let execute_request = ExecuteRequest {
395            code,
396            ..ExecuteRequest::default()
397        };
398
399        let message: JupyterMessage = execute_request.into();
400
401        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
402
403        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
404
405        self.blocks.retain(|_key, block| {
406            if anchor_range.overlaps(&block.code_range, &buffer) {
407                blocks_to_remove.insert(block.block_id);
408                false
409            } else {
410                true
411            }
412        });
413
414        self.editor
415            .update(cx, |editor, cx| {
416                editor.remove_blocks(blocks_to_remove, None, cx);
417            })
418            .ok();
419
420        let status = match &self.kernel {
421            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
422            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
423            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
424            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
425            Kernel::Shutdown => ExecutionStatus::Shutdown,
426        };
427
428        let parent_message_id = message.header.msg_id.clone();
429        let session_view = cx.view().downgrade();
430        let weak_editor = self.editor.clone();
431
432        let on_close: CloseBlockFn =
433            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
434                if let Some(session) = session_view.upgrade() {
435                    session.update(cx, |session, cx| {
436                        session.blocks.remove(&parent_message_id);
437                        cx.notify();
438                    });
439                }
440
441                if let Some(editor) = weak_editor.upgrade() {
442                    editor.update(cx, |editor, cx| {
443                        let mut block_ids = HashSet::default();
444                        block_ids.insert(block_id);
445                        editor.remove_blocks(block_ids, None, cx);
446                    });
447                }
448            });
449
450        let Ok(editor_block) =
451            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
452        else {
453            return;
454        };
455
456        let new_cursor_pos = editor_block.invalidation_anchor;
457
458        self.blocks
459            .insert(message.header.msg_id.clone(), editor_block);
460
461        match &self.kernel {
462            Kernel::RunningKernel(_) => {
463                self.send(message, cx).ok();
464            }
465            Kernel::StartingKernel(task) => {
466                // Queue up the execution as a task to run after the kernel starts
467                let task = task.clone();
468                let message = message.clone();
469
470                cx.spawn(|this, mut cx| async move {
471                    task.await;
472                    this.update(&mut cx, |this, cx| {
473                        this.send(message, cx).ok();
474                    })
475                    .ok();
476                })
477                .detach();
478            }
479            _ => {}
480        }
481
482        // Now move the cursor to after the block
483        editor.update(cx, move |editor, cx| {
484            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
485                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
486            });
487        });
488    }
489
490    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
491        let parent_message_id = match message.parent_header.as_ref() {
492            Some(header) => &header.msg_id,
493            None => return,
494        };
495
496        match &message.content {
497            JupyterMessageContent::Status(status) => {
498                self.kernel.set_execution_state(&status.execution_state);
499                cx.notify();
500            }
501            JupyterMessageContent::KernelInfoReply(reply) => {
502                self.kernel.set_kernel_info(&reply);
503                cx.notify();
504            }
505            _ => {}
506        }
507
508        if let Some(block) = self.blocks.get_mut(parent_message_id) {
509            block.handle_message(&message, cx);
510            return;
511        }
512    }
513
514    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
515        match &mut self.kernel {
516            Kernel::RunningKernel(_kernel) => {
517                self.send(InterruptRequest {}.into(), cx).ok();
518            }
519            Kernel::StartingKernel(_task) => {
520                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
521            }
522            _ => {}
523        }
524    }
525
526    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
527        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
528
529        match kernel {
530            Kernel::RunningKernel(mut kernel) => {
531                let mut request_tx = kernel.request_tx.clone();
532
533                cx.spawn(|this, mut cx| async move {
534                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
535                    request_tx.try_send(message).ok();
536
537                    // Give the kernel a bit of time to clean up
538                    cx.background_executor().timer(Duration::from_secs(3)).await;
539
540                    kernel.process.kill().ok();
541
542                    this.update(&mut cx, |this, cx| {
543                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
544                        this.clear_outputs(cx);
545                        this.kernel = Kernel::Shutdown;
546                        cx.notify();
547                    })
548                    .ok();
549                })
550                .detach();
551            }
552            Kernel::StartingKernel(_kernel) => {
553                self.kernel = Kernel::Shutdown;
554            }
555            _ => {
556                self.kernel = Kernel::Shutdown;
557            }
558        }
559        cx.notify();
560    }
561}
562
563pub enum SessionEvent {
564    Shutdown(WeakView<Editor>),
565}
566
567impl EventEmitter<SessionEvent> for Session {}
568
569impl Render for Session {
570    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
571        let (status_text, interrupt_button) = match &self.kernel {
572            Kernel::RunningKernel(kernel) => (
573                kernel
574                    .kernel_info
575                    .as_ref()
576                    .map(|info| info.language_info.name.clone()),
577                Some(
578                    Button::new("interrupt", "Interrupt")
579                        .style(ButtonStyle::Subtle)
580                        .on_click(cx.listener(move |session, _, cx| {
581                            session.interrupt(cx);
582                        })),
583                ),
584            ),
585            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
586            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
587            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
588            Kernel::Shutdown => (Some("Shutdown".into()), None),
589        };
590
591        KernelListItem::new(self.kernel_specification.clone())
592            .status_color(match &self.kernel {
593                Kernel::RunningKernel(kernel) => match kernel.execution_state {
594                    ExecutionState::Idle => Color::Success,
595                    ExecutionState::Busy => Color::Modified,
596                },
597                Kernel::StartingKernel(_) => Color::Modified,
598                Kernel::ErroredLaunch(_) => Color::Error,
599                Kernel::ShuttingDown => Color::Modified,
600                Kernel::Shutdown => Color::Disabled,
601            })
602            .child(Label::new(self.kernel_specification.name.clone()))
603            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
604            .button(
605                Button::new("shutdown", "Shutdown")
606                    .style(ButtonStyle::Subtle)
607                    .disabled(self.kernel.is_shutting_down())
608                    .on_click(cx.listener(move |session, _, cx| {
609                        session.shutdown(cx);
610                    })),
611            )
612            .buttons(interrupt_button)
613    }
614}