session.rs

  1use crate::components::KernelListItem;
  2use crate::{
  3    kernels::{Kernel, KernelSpecification, RunningKernel},
  4    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  5};
  6use collections::{HashMap, HashSet};
  7use editor::{
  8    display_map::{
  9        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
 10        RenderBlock,
 11    },
 12    scroll::Autoscroll,
 13    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 14};
 15use futures::{FutureExt as _, StreamExt as _};
 16use gpui::{
 17    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 18    WeakView,
 19};
 20use language::Point;
 21use project::Fs;
 22use runtimelib::{
 23    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
 24    ShutdownRequest,
 25};
 26use settings::Settings as _;
 27use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 28use theme::{ActiveTheme, ThemeSettings};
 29use ui::{prelude::*, IconButtonShape, Tooltip};
 30
 31pub struct Session {
 32    editor: WeakView<Editor>,
 33    pub kernel: Kernel,
 34    blocks: HashMap<String, EditorBlock>,
 35    messaging_task: Task<()>,
 36    pub kernel_specification: KernelSpecification,
 37    _buffer_subscription: Subscription,
 38}
 39
 40struct EditorBlock {
 41    editor: WeakView<Editor>,
 42    code_range: Range<Anchor>,
 43    invalidation_anchor: Anchor,
 44    block_id: CustomBlockId,
 45    execution_view: View<ExecutionView>,
 46    on_close: CloseBlockFn,
 47}
 48
 49type CloseBlockFn =
 50    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 51
 52impl EditorBlock {
 53    fn new(
 54        editor: WeakView<Editor>,
 55        code_range: Range<Anchor>,
 56        status: ExecutionStatus,
 57        on_close: CloseBlockFn,
 58        cx: &mut ViewContext<Session>,
 59    ) -> anyhow::Result<Self> {
 60        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 61
 62        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 63            let buffer = editor.buffer().clone();
 64            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 65            let end_point = code_range.end.to_point(&buffer_snapshot);
 66            let next_row_start = end_point + Point::new(1, 0);
 67            if next_row_start > buffer_snapshot.max_point() {
 68                buffer.update(cx, |buffer, cx| {
 69                    buffer.edit(
 70                        [(
 71                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 72                            "\n",
 73                        )],
 74                        None,
 75                        cx,
 76                    )
 77                });
 78            }
 79
 80            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 81            let block = BlockProperties {
 82                position: code_range.end,
 83                height: execution_view.num_lines(cx).saturating_add(1),
 84                style: BlockStyle::Sticky,
 85                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 86                disposition: BlockDisposition::Below,
 87            };
 88
 89            let block_id = editor.insert_blocks([block], None, cx)[0];
 90            (block_id, invalidation_anchor)
 91        })?;
 92
 93        anyhow::Ok(Self {
 94            editor,
 95            code_range,
 96            invalidation_anchor,
 97            block_id,
 98            execution_view,
 99            on_close,
100        })
101    }
102
103    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
104        self.execution_view.update(cx, |execution_view, cx| {
105            execution_view.push_message(&message.content, cx);
106        });
107
108        self.editor
109            .update(cx, |editor, cx| {
110                let mut replacements = HashMap::default();
111
112                replacements.insert(
113                    self.block_id,
114                    (
115                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
116                        Self::create_output_area_renderer(
117                            self.execution_view.clone(),
118                            self.on_close.clone(),
119                        ),
120                    ),
121                );
122                editor.replace_blocks(replacements, None, cx);
123            })
124            .ok();
125    }
126
127    fn create_output_area_renderer(
128        execution_view: View<ExecutionView>,
129        on_close: CloseBlockFn,
130    ) -> RenderBlock {
131        let render = move |cx: &mut BlockContext| {
132            let execution_view = execution_view.clone();
133            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
134            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
135
136            let gutter = cx.gutter_dimensions;
137            let close_button_size = IconSize::XSmall;
138
139            let block_id = cx.block_id;
140            let on_close = on_close.clone();
141
142            let rem_size = cx.rem_size();
143            let line_height = cx.text_style().line_height_in_pixels(rem_size);
144
145            let (close_button_width, close_button_padding) =
146                close_button_size.square_components(cx);
147
148            div()
149                .min_h(line_height)
150                .flex()
151                .flex_row()
152                .items_start()
153                .w_full()
154                .bg(cx.theme().colors().background)
155                .border_y_1()
156                .border_color(cx.theme().colors().border)
157                .child(
158                    v_flex().min_h(cx.line_height()).justify_center().child(
159                        h_flex()
160                            .w(gutter.full_width())
161                            .justify_end()
162                            .pt(line_height / 2.)
163                            .child(
164                                h_flex()
165                                    .pr(gutter.width / 2. - close_button_width
166                                        + close_button_padding / 2.)
167                                    .child(
168                                        IconButton::new(
169                                            ("close_output_area", EntityId::from(cx.block_id)),
170                                            IconName::Close,
171                                        )
172                                        .shape(IconButtonShape::Square)
173                                        .icon_size(close_button_size)
174                                        .icon_color(Color::Muted)
175                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
176                                        .on_click(
177                                            move |_, cx| {
178                                                if let BlockId::Custom(block_id) = block_id {
179                                                    (on_close)(block_id, cx)
180                                                }
181                                            },
182                                        ),
183                                    ),
184                            ),
185                    ),
186                )
187                .child(
188                    div()
189                        .flex_1()
190                        .size_full()
191                        .my_2()
192                        .mr(gutter.width)
193                        .text_size(text_font_size)
194                        .font_family(text_font)
195                        .child(execution_view),
196                )
197                .into_any_element()
198        };
199
200        Box::new(render)
201    }
202}
203
204impl Session {
205    pub fn new(
206        editor: WeakView<Editor>,
207        fs: Arc<dyn Fs>,
208        kernel_specification: KernelSpecification,
209        cx: &mut ViewContext<Self>,
210    ) -> Self {
211        let entity_id = editor.entity_id();
212        let working_directory = editor
213            .upgrade()
214            .and_then(|editor| editor.read(cx).working_directory(cx))
215            .unwrap_or_else(temp_dir);
216        let kernel = RunningKernel::new(
217            kernel_specification.clone(),
218            entity_id,
219            working_directory,
220            fs.clone(),
221            cx,
222        );
223
224        let pending_kernel = cx
225            .spawn(|this, mut cx| async move {
226                let kernel = kernel.await;
227
228                match kernel {
229                    Ok((mut kernel, mut messages_rx)) => {
230                        this.update(&mut cx, |this, cx| {
231                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
232
233                            let status = kernel.process.status();
234                            this.kernel = Kernel::RunningKernel(kernel);
235
236                            cx.spawn(|session, mut cx| async move {
237                                let error_message = match status.await {
238                                    Ok(status) => {
239                                        if status.success() {
240                                            log::info!("kernel process exited successfully");
241                                            return;
242                                        }
243
244                                        format!("kernel process exited with status: {:?}", status)
245                                    }
246                                    Err(err) => {
247                                        format!("kernel process exited with error: {:?}", err)
248                                    }
249                                };
250
251                                log::error!("{}", error_message);
252
253                                session
254                                    .update(&mut cx, |session, cx| {
255                                        session.kernel =
256                                            Kernel::ErroredLaunch(error_message.clone());
257
258                                        session.blocks.values().for_each(|block| {
259                                            block.execution_view.update(
260                                                cx,
261                                                |execution_view, cx| {
262                                                    match execution_view.status {
263                                                        ExecutionStatus::Finished => {
264                                                            // Do nothing when the output was good
265                                                        }
266                                                        _ => {
267                                                            // All other cases, set the status to errored
268                                                            execution_view.status =
269                                                                ExecutionStatus::KernelErrored(
270                                                                    error_message.clone(),
271                                                                )
272                                                        }
273                                                    }
274                                                    cx.notify();
275                                                },
276                                            );
277                                        });
278
279                                        cx.notify();
280                                    })
281                                    .ok();
282                            })
283                            .detach();
284
285                            this.messaging_task = cx.spawn(|session, mut cx| async move {
286                                while let Some(message) = messages_rx.next().await {
287                                    session
288                                        .update(&mut cx, |session, cx| {
289                                            session.route(&message, cx);
290                                        })
291                                        .ok();
292                                }
293                            });
294                        })
295                        .ok();
296                    }
297                    Err(err) => {
298                        this.update(&mut cx, |this, _cx| {
299                            this.kernel = Kernel::ErroredLaunch(err.to_string());
300                        })
301                        .ok();
302                    }
303                }
304            })
305            .shared();
306
307        let subscription = match editor.upgrade() {
308            Some(editor) => {
309                let buffer = editor.read(cx).buffer().clone();
310                cx.subscribe(&buffer, Self::on_buffer_event)
311            }
312            None => Subscription::new(|| {}),
313        };
314
315        return Self {
316            editor,
317            kernel: Kernel::StartingKernel(pending_kernel),
318            messaging_task: Task::ready(()),
319            blocks: HashMap::default(),
320            kernel_specification,
321            _buffer_subscription: subscription,
322        };
323    }
324
325    fn on_buffer_event(
326        &mut self,
327        buffer: Model<MultiBuffer>,
328        event: &multi_buffer::Event,
329        cx: &mut ViewContext<Self>,
330    ) {
331        if let multi_buffer::Event::Edited { .. } = event {
332            let snapshot = buffer.read(cx).snapshot(cx);
333
334            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
335
336            self.blocks.retain(|_id, block| {
337                if block.invalidation_anchor.is_valid(&snapshot) {
338                    true
339                } else {
340                    blocks_to_remove.insert(block.block_id);
341                    false
342                }
343            });
344
345            if !blocks_to_remove.is_empty() {
346                self.editor
347                    .update(cx, |editor, cx| {
348                        editor.remove_blocks(blocks_to_remove, None, cx);
349                    })
350                    .ok();
351                cx.notify();
352            }
353        }
354    }
355
356    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
357        match &mut self.kernel {
358            Kernel::RunningKernel(kernel) => {
359                kernel.request_tx.try_send(message).ok();
360            }
361            _ => {}
362        }
363
364        anyhow::Ok(())
365    }
366
367    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
368        let blocks_to_remove: HashSet<CustomBlockId> =
369            self.blocks.values().map(|block| block.block_id).collect();
370
371        self.editor
372            .update(cx, |editor, cx| {
373                editor.remove_blocks(blocks_to_remove, None, cx);
374            })
375            .ok();
376
377        self.blocks.clear();
378    }
379
380    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
381        let Some(editor) = self.editor.upgrade() else {
382            return;
383        };
384
385        if code.is_empty() {
386            return;
387        }
388
389        let execute_request = ExecuteRequest {
390            code: code.to_string(),
391            ..ExecuteRequest::default()
392        };
393
394        let message: JupyterMessage = execute_request.into();
395
396        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
397
398        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
399
400        self.blocks.retain(|_key, block| {
401            if anchor_range.overlaps(&block.code_range, &buffer) {
402                blocks_to_remove.insert(block.block_id);
403                false
404            } else {
405                true
406            }
407        });
408
409        self.editor
410            .update(cx, |editor, cx| {
411                editor.remove_blocks(blocks_to_remove, None, cx);
412            })
413            .ok();
414
415        let status = match &self.kernel {
416            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
417            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
418            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
419            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
420            Kernel::Shutdown => ExecutionStatus::Shutdown,
421        };
422
423        let parent_message_id = message.header.msg_id.clone();
424        let session_view = cx.view().downgrade();
425        let weak_editor = self.editor.clone();
426
427        let on_close: CloseBlockFn =
428            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
429                if let Some(session) = session_view.upgrade() {
430                    session.update(cx, |session, cx| {
431                        session.blocks.remove(&parent_message_id);
432                        cx.notify();
433                    });
434                }
435
436                if let Some(editor) = weak_editor.upgrade() {
437                    editor.update(cx, |editor, cx| {
438                        let mut block_ids = HashSet::default();
439                        block_ids.insert(block_id);
440                        editor.remove_blocks(block_ids, None, cx);
441                    });
442                }
443            });
444
445        let Ok(editor_block) =
446            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
447        else {
448            return;
449        };
450
451        let new_cursor_pos = editor_block.invalidation_anchor;
452
453        self.blocks
454            .insert(message.header.msg_id.clone(), editor_block);
455
456        match &self.kernel {
457            Kernel::RunningKernel(_) => {
458                self.send(message, cx).ok();
459            }
460            Kernel::StartingKernel(task) => {
461                // Queue up the execution as a task to run after the kernel starts
462                let task = task.clone();
463                let message = message.clone();
464
465                cx.spawn(|this, mut cx| async move {
466                    task.await;
467                    this.update(&mut cx, |this, cx| {
468                        this.send(message, cx).ok();
469                    })
470                    .ok();
471                })
472                .detach();
473            }
474            _ => {}
475        }
476
477        // Now move the cursor to after the block
478        editor.update(cx, move |editor, cx| {
479            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
480                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
481            });
482        });
483    }
484
485    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
486        let parent_message_id = match message.parent_header.as_ref() {
487            Some(header) => &header.msg_id,
488            None => return,
489        };
490
491        match &message.content {
492            JupyterMessageContent::Status(status) => {
493                self.kernel.set_execution_state(&status.execution_state);
494                cx.notify();
495            }
496            JupyterMessageContent::KernelInfoReply(reply) => {
497                self.kernel.set_kernel_info(&reply);
498                cx.notify();
499            }
500            _ => {}
501        }
502
503        if let Some(block) = self.blocks.get_mut(parent_message_id) {
504            block.handle_message(&message, cx);
505            return;
506        }
507    }
508
509    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
510        match &mut self.kernel {
511            Kernel::RunningKernel(_kernel) => {
512                self.send(InterruptRequest {}.into(), cx).ok();
513            }
514            Kernel::StartingKernel(_task) => {
515                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
516            }
517            _ => {}
518        }
519    }
520
521    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
522        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
523
524        match kernel {
525            Kernel::RunningKernel(mut kernel) => {
526                let mut request_tx = kernel.request_tx.clone();
527
528                cx.spawn(|this, mut cx| async move {
529                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
530                    request_tx.try_send(message).ok();
531
532                    // Give the kernel a bit of time to clean up
533                    cx.background_executor().timer(Duration::from_secs(3)).await;
534
535                    kernel.process.kill().ok();
536
537                    this.update(&mut cx, |this, cx| {
538                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
539                        this.clear_outputs(cx);
540                        this.kernel = Kernel::Shutdown;
541                        cx.notify();
542                    })
543                    .ok();
544                })
545                .detach();
546            }
547            Kernel::StartingKernel(_kernel) => {
548                self.kernel = Kernel::Shutdown;
549            }
550            _ => {
551                self.kernel = Kernel::Shutdown;
552            }
553        }
554        cx.notify();
555    }
556}
557
558pub enum SessionEvent {
559    Shutdown(WeakView<Editor>),
560}
561
562impl EventEmitter<SessionEvent> for Session {}
563
564impl Render for Session {
565    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
566        let (status_text, interrupt_button) = match &self.kernel {
567            Kernel::RunningKernel(kernel) => (
568                kernel
569                    .kernel_info
570                    .as_ref()
571                    .map(|info| info.language_info.name.clone()),
572                Some(
573                    Button::new("interrupt", "Interrupt")
574                        .style(ButtonStyle::Subtle)
575                        .on_click(cx.listener(move |session, _, cx| {
576                            session.interrupt(cx);
577                        })),
578                ),
579            ),
580            Kernel::StartingKernel(_) => (Some("Starting".into()), None),
581            Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
582            Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
583            Kernel::Shutdown => (Some("Shutdown".into()), None),
584        };
585
586        KernelListItem::new(self.kernel_specification.clone())
587            .status_color(match &self.kernel {
588                Kernel::RunningKernel(kernel) => match kernel.execution_state {
589                    ExecutionState::Idle => Color::Success,
590                    ExecutionState::Busy => Color::Modified,
591                },
592                Kernel::StartingKernel(_) => Color::Modified,
593                Kernel::ErroredLaunch(_) => Color::Error,
594                Kernel::ShuttingDown => Color::Modified,
595                Kernel::Shutdown => Color::Disabled,
596            })
597            .child(Label::new(self.kernel_specification.name.clone()))
598            .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
599            .button(
600                Button::new("shutdown", "Shutdown")
601                    .style(ButtonStyle::Subtle)
602                    .disabled(self.kernel.is_shutting_down())
603                    .on_click(cx.listener(move |session, _, cx| {
604                        session.shutdown(cx);
605                    })),
606            )
607            .buttons(interrupt_button)
608    }
609}