session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
  9        RenderBlock,
 10    },
 11    scroll::Autoscroll,
 12    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 13};
 14use futures::{FutureExt as _, StreamExt as _};
 15use gpui::{
 16    div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
 17    WeakView,
 18};
 19use language::Point;
 20use project::Fs;
 21use runtimelib::{
 22    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
 23};
 24use settings::Settings as _;
 25use std::{env::temp_dir, ops::Range, path::PathBuf, sync::Arc, time::Duration};
 26use theme::{ActiveTheme, ThemeSettings};
 27use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, IconButtonShape, Label, Tooltip};
 28
 29pub struct Session {
 30    pub editor: WeakView<Editor>,
 31    pub kernel: Kernel,
 32    blocks: HashMap<String, EditorBlock>,
 33    pub messaging_task: Task<()>,
 34    pub kernel_specification: KernelSpecification,
 35    _buffer_subscription: Subscription,
 36}
 37
 38struct EditorBlock {
 39    editor: WeakView<Editor>,
 40    code_range: Range<Anchor>,
 41    invalidation_anchor: Anchor,
 42    block_id: CustomBlockId,
 43    execution_view: View<ExecutionView>,
 44    on_close: CloseBlockFn,
 45}
 46
 47type CloseBlockFn =
 48    Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
 49
 50impl EditorBlock {
 51    fn new(
 52        editor: WeakView<Editor>,
 53        code_range: Range<Anchor>,
 54        status: ExecutionStatus,
 55        on_close: CloseBlockFn,
 56        cx: &mut ViewContext<Session>,
 57    ) -> anyhow::Result<Self> {
 58        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 59
 60        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 61            let buffer = editor.buffer().clone();
 62            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 63            let end_point = code_range.end.to_point(&buffer_snapshot);
 64            let next_row_start = end_point + Point::new(1, 0);
 65            if next_row_start > buffer_snapshot.max_point() {
 66                buffer.update(cx, |buffer, cx| {
 67                    buffer.edit(
 68                        [(
 69                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 70                            "\n",
 71                        )],
 72                        None,
 73                        cx,
 74                    )
 75                });
 76            }
 77
 78            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 79            let block = BlockProperties {
 80                position: code_range.end,
 81                height: execution_view.num_lines(cx).saturating_add(1),
 82                style: BlockStyle::Sticky,
 83                render: Self::create_output_area_render(execution_view.clone(), on_close.clone()),
 84                disposition: BlockDisposition::Below,
 85            };
 86
 87            let block_id = editor.insert_blocks([block], None, cx)[0];
 88            (block_id, invalidation_anchor)
 89        })?;
 90
 91        anyhow::Ok(Self {
 92            editor,
 93            code_range,
 94            invalidation_anchor,
 95            block_id,
 96            execution_view,
 97            on_close,
 98        })
 99    }
100
101    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
102        self.execution_view.update(cx, |execution_view, cx| {
103            execution_view.push_message(&message.content, cx);
104        });
105
106        self.editor
107            .update(cx, |editor, cx| {
108                let mut replacements = HashMap::default();
109
110                replacements.insert(
111                    self.block_id,
112                    (
113                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
114                        Self::create_output_area_render(
115                            self.execution_view.clone(),
116                            self.on_close.clone(),
117                        ),
118                    ),
119                );
120                editor.replace_blocks(replacements, None, cx);
121            })
122            .ok();
123    }
124
125    fn create_output_area_render(
126        execution_view: View<ExecutionView>,
127        on_close: CloseBlockFn,
128    ) -> RenderBlock {
129        let render = move |cx: &mut BlockContext| {
130            let execution_view = execution_view.clone();
131            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
132            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
133
134            let gutter = cx.gutter_dimensions;
135            let close_button_size = IconSize::XSmall;
136
137            let block_id = cx.block_id;
138            let on_close = on_close.clone();
139
140            let rem_size = cx.rem_size();
141            let line_height = cx.text_style().line_height_in_pixels(rem_size);
142
143            let (close_button_width, close_button_padding) =
144                close_button_size.square_components(cx);
145
146            div()
147                .min_h(line_height)
148                .flex()
149                .flex_row()
150                .items_start()
151                .w_full()
152                .bg(cx.theme().colors().background)
153                .border_y_1()
154                .border_color(cx.theme().colors().border)
155                .child(
156                    v_flex().min_h(cx.line_height()).justify_center().child(
157                        h_flex()
158                            .w(gutter.full_width())
159                            .justify_end()
160                            .pt(line_height / 2.)
161                            .child(
162                                h_flex()
163                                    .pr(gutter.width / 2. - close_button_width
164                                        + close_button_padding / 2.)
165                                    .child(
166                                        IconButton::new(
167                                            ("close_output_area", EntityId::from(cx.block_id)),
168                                            IconName::Close,
169                                        )
170                                        .shape(IconButtonShape::Square)
171                                        .icon_size(close_button_size)
172                                        .icon_color(Color::Muted)
173                                        .tooltip(|cx| Tooltip::text("Close output area", cx))
174                                        .on_click(
175                                            move |_, cx| {
176                                                if let BlockId::Custom(block_id) = block_id {
177                                                    (on_close)(block_id, cx)
178                                                }
179                                            },
180                                        ),
181                                    ),
182                            ),
183                    ),
184                )
185                .child(
186                    div()
187                        .flex_1()
188                        .size_full()
189                        .my_2()
190                        .mr(gutter.width)
191                        .text_size(text_font_size)
192                        .font_family(text_font)
193                        .child(execution_view),
194                )
195                .into_any_element()
196        };
197
198        Box::new(render)
199    }
200}
201
202impl Session {
203    pub fn working_directory(editor: WeakView<Editor>, cx: &WindowContext) -> PathBuf {
204        if let Some(working_directory) = editor
205            .upgrade()
206            .and_then(|editor| editor.read(cx).working_directory(cx))
207        {
208            working_directory
209        } else {
210            temp_dir()
211        }
212    }
213
214    pub fn new(
215        editor: WeakView<Editor>,
216        fs: Arc<dyn Fs>,
217        kernel_specification: KernelSpecification,
218        cx: &mut ViewContext<Self>,
219    ) -> Self {
220        let entity_id = editor.entity_id();
221
222        let kernel = RunningKernel::new(
223            kernel_specification.clone(),
224            entity_id,
225            Self::working_directory(editor.clone(), cx),
226            fs.clone(),
227            cx,
228        );
229
230        let pending_kernel = cx
231            .spawn(|this, mut cx| async move {
232                let kernel = kernel.await;
233
234                match kernel {
235                    Ok((mut kernel, mut messages_rx)) => {
236                        this.update(&mut cx, |this, cx| {
237                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
238
239                            let status = kernel.process.status();
240                            this.kernel = Kernel::RunningKernel(kernel);
241
242                            cx.spawn(|session, mut cx| async move {
243                                let error_message = match status.await {
244                                    Ok(status) => {
245                                        if status.success() {
246                                            log::info!("kernel process exited successfully");
247                                            return;
248                                        }
249
250                                        format!("kernel process exited with status: {:?}", status)
251                                    }
252                                    Err(err) => {
253                                        format!("kernel process exited with error: {:?}", err)
254                                    }
255                                };
256
257                                log::error!("{}", error_message);
258
259                                session
260                                    .update(&mut cx, |session, cx| {
261                                        session.kernel =
262                                            Kernel::ErroredLaunch(error_message.clone());
263
264                                        session.blocks.values().for_each(|block| {
265                                            block.execution_view.update(
266                                                cx,
267                                                |execution_view, cx| {
268                                                    match execution_view.status {
269                                                        ExecutionStatus::Finished => {
270                                                            // Do nothing when the output was good
271                                                        }
272                                                        _ => {
273                                                            // All other cases, set the status to errored
274                                                            execution_view.status =
275                                                                ExecutionStatus::KernelErrored(
276                                                                    error_message.clone(),
277                                                                )
278                                                        }
279                                                    }
280                                                    cx.notify();
281                                                },
282                                            );
283                                        });
284
285                                        cx.notify();
286                                    })
287                                    .ok();
288                            })
289                            .detach();
290
291                            this.messaging_task = cx.spawn(|session, mut cx| async move {
292                                while let Some(message) = messages_rx.next().await {
293                                    session
294                                        .update(&mut cx, |session, cx| {
295                                            session.route(&message, cx);
296                                        })
297                                        .ok();
298                                }
299                            });
300                        })
301                        .ok();
302                    }
303                    Err(err) => {
304                        this.update(&mut cx, |this, _cx| {
305                            this.kernel = Kernel::ErroredLaunch(err.to_string());
306                        })
307                        .ok();
308                    }
309                }
310            })
311            .shared();
312
313        let subscription = match editor.upgrade() {
314            Some(editor) => {
315                let buffer = editor.read(cx).buffer().clone();
316                cx.subscribe(&buffer, Self::on_buffer_event)
317            }
318            None => Subscription::new(|| {}),
319        };
320
321        return Self {
322            editor,
323            kernel: Kernel::StartingKernel(pending_kernel),
324            messaging_task: Task::ready(()),
325            blocks: HashMap::default(),
326            kernel_specification,
327            _buffer_subscription: subscription,
328        };
329    }
330
331    fn on_buffer_event(
332        &mut self,
333        buffer: Model<MultiBuffer>,
334        event: &multi_buffer::Event,
335        cx: &mut ViewContext<Self>,
336    ) {
337        if let multi_buffer::Event::Edited { .. } = event {
338            let snapshot = buffer.read(cx).snapshot(cx);
339
340            let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
341
342            self.blocks.retain(|_id, block| {
343                if block.invalidation_anchor.is_valid(&snapshot) {
344                    true
345                } else {
346                    blocks_to_remove.insert(block.block_id);
347                    false
348                }
349            });
350
351            if !blocks_to_remove.is_empty() {
352                self.editor
353                    .update(cx, |editor, cx| {
354                        editor.remove_blocks(blocks_to_remove, None, cx);
355                    })
356                    .ok();
357                cx.notify();
358            }
359        }
360    }
361
362    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
363        match &mut self.kernel {
364            Kernel::RunningKernel(kernel) => {
365                kernel.request_tx.try_send(message).ok();
366            }
367            _ => {}
368        }
369
370        anyhow::Ok(())
371    }
372
373    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
374        let blocks_to_remove: HashSet<CustomBlockId> =
375            self.blocks.values().map(|block| block.block_id).collect();
376
377        self.editor
378            .update(cx, |editor, cx| {
379                editor.remove_blocks(blocks_to_remove, None, cx);
380            })
381            .ok();
382
383        self.blocks.clear();
384    }
385
386    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
387        let editor = if let Some(editor) = self.editor.upgrade() {
388            editor
389        } else {
390            return;
391        };
392
393        if code.is_empty() {
394            return;
395        }
396
397        let execute_request = ExecuteRequest {
398            code: code.to_string(),
399            ..ExecuteRequest::default()
400        };
401
402        let message: JupyterMessage = execute_request.into();
403
404        let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
405
406        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
407
408        self.blocks.retain(|_key, block| {
409            if anchor_range.overlaps(&block.code_range, &buffer) {
410                blocks_to_remove.insert(block.block_id);
411                false
412            } else {
413                true
414            }
415        });
416
417        self.editor
418            .update(cx, |editor, cx| {
419                editor.remove_blocks(blocks_to_remove, None, cx);
420            })
421            .ok();
422
423        let status = match &self.kernel {
424            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
425            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
426            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
427            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
428            Kernel::Shutdown => ExecutionStatus::Shutdown,
429        };
430
431        let parent_message_id = message.header.msg_id.clone();
432        let session_view = cx.view().downgrade();
433        let weak_editor = self.editor.clone();
434
435        let on_close: CloseBlockFn =
436            Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
437                if let Some(session) = session_view.upgrade() {
438                    session.update(cx, |session, cx| {
439                        session.blocks.remove(&parent_message_id);
440                        cx.notify();
441                    });
442                }
443
444                if let Some(editor) = weak_editor.upgrade() {
445                    editor.update(cx, |editor, cx| {
446                        let mut block_ids = HashSet::default();
447                        block_ids.insert(block_id);
448                        editor.remove_blocks(block_ids, None, cx);
449                    });
450                }
451            });
452
453        let editor_block = if let Ok(editor_block) =
454            EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
455        {
456            editor_block
457        } else {
458            return;
459        };
460
461        let new_cursor_pos = editor_block.invalidation_anchor;
462
463        self.blocks
464            .insert(message.header.msg_id.clone(), editor_block);
465
466        match &self.kernel {
467            Kernel::RunningKernel(_) => {
468                self.send(message, cx).ok();
469            }
470            Kernel::StartingKernel(task) => {
471                // Queue up the execution as a task to run after the kernel starts
472                let task = task.clone();
473                let message = message.clone();
474
475                cx.spawn(|this, mut cx| async move {
476                    task.await;
477                    this.update(&mut cx, |this, cx| {
478                        this.send(message, cx).ok();
479                    })
480                    .ok();
481                })
482                .detach();
483            }
484            _ => {}
485        }
486
487        // Now move the cursor to after the block
488        editor.update(cx, move |editor, cx| {
489            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
490                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
491            });
492        });
493    }
494
495    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
496        let parent_message_id = match message.parent_header.as_ref() {
497            Some(header) => &header.msg_id,
498            None => return,
499        };
500
501        match &message.content {
502            JupyterMessageContent::Status(status) => {
503                self.kernel.set_execution_state(&status.execution_state);
504                cx.notify();
505            }
506            JupyterMessageContent::KernelInfoReply(reply) => {
507                self.kernel.set_kernel_info(&reply);
508                cx.notify();
509            }
510            _ => {}
511        }
512
513        if let Some(block) = self.blocks.get_mut(parent_message_id) {
514            block.handle_message(&message, cx);
515            return;
516        }
517    }
518
519    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
520        match &mut self.kernel {
521            Kernel::RunningKernel(_kernel) => {
522                self.send(InterruptRequest {}.into(), cx).ok();
523            }
524            Kernel::StartingKernel(_task) => {
525                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
526            }
527            _ => {}
528        }
529    }
530
531    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
532        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
533
534        match kernel {
535            Kernel::RunningKernel(mut kernel) => {
536                let mut request_tx = kernel.request_tx.clone();
537
538                cx.spawn(|this, mut cx| async move {
539                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
540                    request_tx.try_send(message).ok();
541
542                    // Give the kernel a bit of time to clean up
543                    cx.background_executor().timer(Duration::from_secs(3)).await;
544
545                    kernel.process.kill().ok();
546
547                    this.update(&mut cx, |this, cx| {
548                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
549                        this.clear_outputs(cx);
550                        this.kernel = Kernel::Shutdown;
551                        cx.notify();
552                    })
553                    .ok();
554                })
555                .detach();
556            }
557            Kernel::StartingKernel(_kernel) => {
558                self.kernel = Kernel::Shutdown;
559            }
560            _ => {
561                self.kernel = Kernel::Shutdown;
562            }
563        }
564        cx.notify();
565    }
566}
567
568pub enum SessionEvent {
569    Shutdown(WeakView<Editor>),
570}
571
572impl EventEmitter<SessionEvent> for Session {}
573
574impl Render for Session {
575    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
576        let mut buttons = vec![];
577
578        buttons.push(
579            ButtonLike::new("shutdown")
580                .child(Label::new("Shutdown"))
581                .style(ButtonStyle::Subtle)
582                .on_click(cx.listener(move |session, _, cx| {
583                    session.shutdown(cx);
584                })),
585        );
586
587        let status_text = match &self.kernel {
588            Kernel::RunningKernel(kernel) => {
589                buttons.push(
590                    ButtonLike::new("interrupt")
591                        .child(Label::new("Interrupt"))
592                        .style(ButtonStyle::Subtle)
593                        .on_click(cx.listener(move |session, _, cx| {
594                            session.interrupt(cx);
595                        })),
596                );
597                let mut name = self.kernel_specification.name.clone();
598
599                if let Some(info) = &kernel.kernel_info {
600                    name.push_str(" (");
601                    name.push_str(&info.language_info.name);
602                    name.push_str(")");
603                }
604                name
605            }
606            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
607            Kernel::ErroredLaunch(err) => {
608                format!("{} (Error: {})", self.kernel_specification.name, err)
609            }
610            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
611            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
612        };
613
614        return v_flex()
615            .gap_1()
616            .child(
617                h_flex()
618                    .gap_2()
619                    .child(self.kernel.dot())
620                    .child(Label::new(status_text)),
621            )
622            .child(h_flex().gap_2().children(buttons));
623    }
624}