session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
  9        RenderBlock,
 10    },
 11    scroll::Autoscroll,
 12    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 13};
 14use futures::{FutureExt as _, StreamExt as _};
 15use gpui::{
 16    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 17    WeakView,
 18};
 19use language::Point;
 20use project::Fs;
 21use runtimelib::{
 22    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
 23};
 24use settings::Settings as _;
 25use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
 26use theme::{ActiveTheme, ThemeSettings};
 27use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, IconButtonShape, Label, Tooltip};
 28
 29pub struct Session {
 30    editor: WeakView<Editor>,
 31    pub kernel: Kernel,
 32    blocks: HashMap<String, EditorBlock>,
 33    messaging_task: Task<()>,
 34    pub kernel_specification: KernelSpecification,
 35    _buffer_subscription: Subscription,
 36}
 37
 38struct EditorBlock {
 39    editor: WeakView<Editor>,
 40    code_range: Range<Anchor>,
 41    invalidation_anchor: Anchor,
 42    block_id: CustomBlockId,
 43    execution_view: View<ExecutionView>,
 44    on_close: CloseBlockFn,
 45}
 46
 47type CloseBlockFn =
 48    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 49
 50impl EditorBlock {
 51    fn new(
 52        editor: WeakView<Editor>,
 53        code_range: Range<Anchor>,
 54        status: ExecutionStatus,
 55        on_close: CloseBlockFn,
 56        cx: &mut ViewContext<Session>,
 57    ) -> anyhow::Result<Self> {
 58        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 59
 60        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 61            let buffer = editor.buffer().clone();
 62            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 63            let end_point = code_range.end.to_point(&buffer_snapshot);
 64            let next_row_start = end_point + Point::new(1, 0);
 65            if next_row_start > buffer_snapshot.max_point() {
 66                buffer.update(cx, |buffer, cx| {
 67                    buffer.edit(
 68                        [(
 69                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 70                            "\n",
 71                        )],
 72                        None,
 73                        cx,
 74                    )
 75                });
 76            }
 77
 78            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 79            let block = BlockProperties {
 80                position: code_range.end,
 81                height: execution_view.num_lines(cx).saturating_add(1),
 82                style: BlockStyle::Sticky,
 83                render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
 84                disposition: BlockDisposition::Below,
 85            };
 86
 87            let block_id = editor.insert_blocks([block], None, cx)[0];
 88            (block_id, invalidation_anchor)
 89        })?;
 90
 91        anyhow::Ok(Self {
 92            editor,
 93            code_range,
 94            invalidation_anchor,
 95            block_id,
 96            execution_view,
 97            on_close,
 98        })
 99    }
100
101    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
102        self.execution_view.update(cx, |execution_view, cx| {
103            execution_view.push_message(&message.content, cx);
104        });
105
106        self.editor
107            .update(cx, |editor, cx| {
108                let mut replacements = HashMap::default();
109
110                replacements.insert(
111                    self.block_id,
112                    (
113                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
114                        Self::create_output_area_renderer(
115                            self.execution_view.clone(),
116                            self.on_close.clone(),
117                        ),
118                    ),
119                );
120                editor.replace_blocks(replacements, None, cx);
121            })
122            .ok();
123    }
124
125    fn create_output_area_renderer(
126        execution_view: View<ExecutionView>,
127        on_close: CloseBlockFn,
128    ) -> RenderBlock {
129        let render = move |cx: &mut BlockContext| {
130            let execution_view = execution_view.clone();
131            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
132            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
133
134            let gutter = cx.gutter_dimensions;
135            let close_button_size = IconSize::XSmall;
136
137            let block_id = cx.block_id;
138            let on_close = on_close.clone();
139
140            let rem_size = cx.rem_size();
141            let line_height = cx.text_style().line_height_in_pixels(rem_size);
142
143            let (close_button_width, close_button_padding) =
144                close_button_size.square_components(cx);
145
146            div()
147                .min_h(line_height)
148                .flex()
149                .flex_row()
150                .items_start()
151                .w_full()
152                .bg(cx.theme().colors().background)
153                .border_y_1()
154                .border_color(cx.theme().colors().border)
155                .child(
156                    v_flex().min_h(cx.line_height()).justify_center().child(
157                        h_flex()
158                            .w(gutter.full_width())
159                            .justify_end()
160                            .pt(line_height / 2.)
161                            .child(
162                                h_flex()
163                                    .pr(gutter.width / 2. - close_button_width
164                                        + close_button_padding / 2.)
165                                    .child(
166                                        IconButton::new(
167                                            ("close_output_area", EntityId::from(cx.block_id)),
168                                            IconName::Close,
169                                        )
170                                        .shape(IconButtonShape::Square)
171                                        .icon_size(close_button_size)
172                                        .icon_color(Color::Muted)
173                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
174                                        .on_click(
175                                            move |_, cx| {
176                                                if let BlockId::Custom(block_id) = block_id {
177                                                    (on_close)(block_id, cx)
178                                                }
179                                            },
180                                        ),
181                                    ),
182                            ),
183                    ),
184                )
185                .child(
186                    div()
187                        .flex_1()
188                        .size_full()
189                        .my_2()
190                        .mr(gutter.width)
191                        .text_size(text_font_size)
192                        .font_family(text_font)
193                        .child(execution_view),
194                )
195                .into_any_element()
196        };
197
198        Box::new(render)
199    }
200}
201
202impl Session {
203    pub fn new(
204        editor: WeakView<Editor>,
205        fs: Arc<dyn Fs>,
206        kernel_specification: KernelSpecification,
207        cx: &mut ViewContext<Self>,
208    ) -> Self {
209        let entity_id = editor.entity_id();
210        let working_directory = editor
211            .upgrade()
212            .and_then(|editor| editor.read(cx).working_directory(cx))
213            .unwrap_or_else(temp_dir);
214        let kernel = RunningKernel::new(
215            kernel_specification.clone(),
216            entity_id,
217            working_directory,
218            fs.clone(),
219            cx,
220        );
221
222        let pending_kernel = cx
223            .spawn(|this, mut cx| async move {
224                let kernel = kernel.await;
225
226                match kernel {
227                    Ok((mut kernel, mut messages_rx)) => {
228                        this.update(&mut cx, |this, cx| {
229                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
230
231                            let status = kernel.process.status();
232                            this.kernel = Kernel::RunningKernel(kernel);
233
234                            cx.spawn(|session, mut cx| async move {
235                                let error_message = match status.await {
236                                    Ok(status) => {
237                                        if status.success() {
238                                            log::info!("kernel process exited successfully");
239                                            return;
240                                        }
241
242                                        format!("kernel process exited with status: {:?}", status)
243                                    }
244                                    Err(err) => {
245                                        format!("kernel process exited with error: {:?}", err)
246                                    }
247                                };
248
249                                log::error!("{}", error_message);
250
251                                session
252                                    .update(&mut cx, |session, cx| {
253                                        session.kernel =
254                                            Kernel::ErroredLaunch(error_message.clone());
255
256                                        session.blocks.values().for_each(|block| {
257                                            block.execution_view.update(
258                                                cx,
259                                                |execution_view, cx| {
260                                                    match execution_view.status {
261                                                        ExecutionStatus::Finished => {
262                                                            // Do nothing when the output was good
263                                                        }
264                                                        _ => {
265                                                            // All other cases, set the status to errored
266                                                            execution_view.status =
267                                                                ExecutionStatus::KernelErrored(
268                                                                    error_message.clone(),
269                                                                )
270                                                        }
271                                                    }
272                                                    cx.notify();
273                                                },
274                                            );
275                                        });
276
277                                        cx.notify();
278                                    })
279                                    .ok();
280                            })
281                            .detach();
282
283                            this.messaging_task = cx.spawn(|session, mut cx| async move {
284                                while let Some(message) = messages_rx.next().await {
285                                    session
286                                        .update(&mut cx, |session, cx| {
287                                            session.route(&message, cx);
288                                        })
289                                        .ok();
290                                }
291                            });
292                        })
293                        .ok();
294                    }
295                    Err(err) => {
296                        this.update(&mut cx, |this, _cx| {
297                            this.kernel = Kernel::ErroredLaunch(err.to_string());
298                        })
299                        .ok();
300                    }
301                }
302            })
303            .shared();
304
305        let subscription = match editor.upgrade() {
306            Some(editor) => {
307                let buffer = editor.read(cx).buffer().clone();
308                cx.subscribe(&buffer, Self::on_buffer_event)
309            }
310            None => Subscription::new(|| {}),
311        };
312
313        return Self {
314            editor,
315            kernel: Kernel::StartingKernel(pending_kernel),
316            messaging_task: Task::ready(()),
317            blocks: HashMap::default(),
318            kernel_specification,
319            _buffer_subscription: subscription,
320        };
321    }
322
323    fn on_buffer_event(
324        &mut self,
325        buffer: Model<MultiBuffer>,
326        event: &multi_buffer::Event,
327        cx: &mut ViewContext<Self>,
328    ) {
329        if let multi_buffer::Event::Edited { .. } = event {
330            let snapshot = buffer.read(cx).snapshot(cx);
331
332            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
333
334            self.blocks.retain(|_id, block| {
335                if block.invalidation_anchor.is_valid(&snapshot) {
336                    true
337                } else {
338                    blocks_to_remove.insert(block.block_id);
339                    false
340                }
341            });
342
343            if !blocks_to_remove.is_empty() {
344                self.editor
345                    .update(cx, |editor, cx| {
346                        editor.remove_blocks(blocks_to_remove, None, cx);
347                    })
348                    .ok();
349                cx.notify();
350            }
351        }
352    }
353
354    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
355        match &mut self.kernel {
356            Kernel::RunningKernel(kernel) => {
357                kernel.request_tx.try_send(message).ok();
358            }
359            _ => {}
360        }
361
362        anyhow::Ok(())
363    }
364
365    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
366        let blocks_to_remove: HashSet<CustomBlockId> =
367            self.blocks.values().map(|block| block.block_id).collect();
368
369        self.editor
370            .update(cx, |editor, cx| {
371                editor.remove_blocks(blocks_to_remove, None, cx);
372            })
373            .ok();
374
375        self.blocks.clear();
376    }
377
378    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
379        let Some(editor) = self.editor.upgrade() else {
380            return;
381        };
382
383        if code.is_empty() {
384            return;
385        }
386
387        let execute_request = ExecuteRequest {
388            code: code.to_string(),
389            ..ExecuteRequest::default()
390        };
391
392        let message: JupyterMessage = execute_request.into();
393
394        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
395
396        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
397
398        self.blocks.retain(|_key, block| {
399            if anchor_range.overlaps(&block.code_range, &buffer) {
400                blocks_to_remove.insert(block.block_id);
401                false
402            } else {
403                true
404            }
405        });
406
407        self.editor
408            .update(cx, |editor, cx| {
409                editor.remove_blocks(blocks_to_remove, None, cx);
410            })
411            .ok();
412
413        let status = match &self.kernel {
414            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
415            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
416            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
417            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
418            Kernel::Shutdown => ExecutionStatus::Shutdown,
419        };
420
421        let parent_message_id = message.header.msg_id.clone();
422        let session_view = cx.view().downgrade();
423        let weak_editor = self.editor.clone();
424
425        let on_close: CloseBlockFn =
426            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
427                if let Some(session) = session_view.upgrade() {
428                    session.update(cx, |session, cx| {
429                        session.blocks.remove(&parent_message_id);
430                        cx.notify();
431                    });
432                }
433
434                if let Some(editor) = weak_editor.upgrade() {
435                    editor.update(cx, |editor, cx| {
436                        let mut block_ids = HashSet::default();
437                        block_ids.insert(block_id);
438                        editor.remove_blocks(block_ids, None, cx);
439                    });
440                }
441            });
442
443        let Ok(editor_block) =
444            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
445        else {
446            return;
447        };
448
449        let new_cursor_pos = editor_block.invalidation_anchor;
450
451        self.blocks
452            .insert(message.header.msg_id.clone(), editor_block);
453
454        match &self.kernel {
455            Kernel::RunningKernel(_) => {
456                self.send(message, cx).ok();
457            }
458            Kernel::StartingKernel(task) => {
459                // Queue up the execution as a task to run after the kernel starts
460                let task = task.clone();
461                let message = message.clone();
462
463                cx.spawn(|this, mut cx| async move {
464                    task.await;
465                    this.update(&mut cx, |this, cx| {
466                        this.send(message, cx).ok();
467                    })
468                    .ok();
469                })
470                .detach();
471            }
472            _ => {}
473        }
474
475        // Now move the cursor to after the block
476        editor.update(cx, move |editor, cx| {
477            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
478                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
479            });
480        });
481    }
482
483    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
484        let parent_message_id = match message.parent_header.as_ref() {
485            Some(header) => &header.msg_id,
486            None => return,
487        };
488
489        match &message.content {
490            JupyterMessageContent::Status(status) => {
491                self.kernel.set_execution_state(&status.execution_state);
492                cx.notify();
493            }
494            JupyterMessageContent::KernelInfoReply(reply) => {
495                self.kernel.set_kernel_info(&reply);
496                cx.notify();
497            }
498            _ => {}
499        }
500
501        if let Some(block) = self.blocks.get_mut(parent_message_id) {
502            block.handle_message(&message, cx);
503            return;
504        }
505    }
506
507    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
508        match &mut self.kernel {
509            Kernel::RunningKernel(_kernel) => {
510                self.send(InterruptRequest {}.into(), cx).ok();
511            }
512            Kernel::StartingKernel(_task) => {
513                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
514            }
515            _ => {}
516        }
517    }
518
519    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
520        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
521
522        match kernel {
523            Kernel::RunningKernel(mut kernel) => {
524                let mut request_tx = kernel.request_tx.clone();
525
526                cx.spawn(|this, mut cx| async move {
527                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
528                    request_tx.try_send(message).ok();
529
530                    // Give the kernel a bit of time to clean up
531                    cx.background_executor().timer(Duration::from_secs(3)).await;
532
533                    kernel.process.kill().ok();
534
535                    this.update(&mut cx, |this, cx| {
536                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
537                        this.clear_outputs(cx);
538                        this.kernel = Kernel::Shutdown;
539                        cx.notify();
540                    })
541                    .ok();
542                })
543                .detach();
544            }
545            Kernel::StartingKernel(_kernel) => {
546                self.kernel = Kernel::Shutdown;
547            }
548            _ => {
549                self.kernel = Kernel::Shutdown;
550            }
551        }
552        cx.notify();
553    }
554}
555
556pub enum SessionEvent {
557    Shutdown(WeakView<Editor>),
558}
559
560impl EventEmitter<SessionEvent> for Session {}
561
562impl Render for Session {
563    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
564        let mut buttons = vec![];
565
566        buttons.push(
567            ButtonLike::new("shutdown")
568                .child(Label::new("Shutdown"))
569                .style(ButtonStyle::Subtle)
570                .on_click(cx.listener(move |session, _, cx| {
571                    session.shutdown(cx);
572                })),
573        );
574
575        let status_text = match &self.kernel {
576            Kernel::RunningKernel(kernel) => {
577                buttons.push(
578                    ButtonLike::new("interrupt")
579                        .child(Label::new("Interrupt"))
580                        .style(ButtonStyle::Subtle)
581                        .on_click(cx.listener(move |session, _, cx| {
582                            session.interrupt(cx);
583                        })),
584                );
585                let mut name = self.kernel_specification.name.clone();
586
587                if let Some(info) = &kernel.kernel_info {
588                    name.push_str(" (");
589                    name.push_str(&info.language_info.name);
590                    name.push_str(")");
591                }
592                name
593            }
594            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
595            Kernel::ErroredLaunch(err) => {
596                format!("{} (Error: {})", self.kernel_specification.name, err)
597            }
598            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
599            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
600        };
601
602        return v_flex()
603            .gap_1()
604            .child(
605                h_flex()
606                    .gap_2()
607                    .child(self.kernel.dot())
608                    .child(Label::new(status_text)),
609            )
610            .child(h_flex().gap_2().children(buttons));
611    }
612}