session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  9    },
 10    Anchor, AnchorRangeExt as _, Editor,
 11};
 12use futures::{FutureExt as _, StreamExt as _};
 13use gpui::{div, prelude::*, EventEmitter, Render, Task, View, ViewContext, WeakView};
 14use project::Fs;
 15use runtimelib::{
 16    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, KernelInfoRequest,
 17    ShutdownRequest,
 18};
 19use settings::Settings as _;
 20use std::{ops::Range, sync::Arc, time::Duration};
 21use theme::{ActiveTheme, ThemeSettings};
 22use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
 23
 24pub struct Session {
 25    editor: WeakView<Editor>,
 26    kernel: Kernel,
 27    blocks: HashMap<String, EditorBlock>,
 28    messaging_task: Task<()>,
 29    kernel_specification: KernelSpecification,
 30}
 31
 32struct EditorBlock {
 33    editor: WeakView<Editor>,
 34    code_range: Range<Anchor>,
 35    block_id: BlockId,
 36    execution_view: View<ExecutionView>,
 37}
 38
 39impl EditorBlock {
 40    fn new(
 41        editor: WeakView<Editor>,
 42        code_range: Range<Anchor>,
 43        status: ExecutionStatus,
 44        cx: &mut ViewContext<Session>,
 45    ) -> anyhow::Result<Self> {
 46        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 47
 48        let block_id = editor.update(cx, |editor, cx| {
 49            let block = BlockProperties {
 50                position: code_range.end,
 51                height: execution_view.num_lines(cx).saturating_add(1),
 52                style: BlockStyle::Sticky,
 53                render: Self::create_output_area_render(execution_view.clone()),
 54                disposition: BlockDisposition::Below,
 55            };
 56
 57            editor.insert_blocks([block], None, cx)[0]
 58        })?;
 59
 60        anyhow::Ok(Self {
 61            editor,
 62            code_range,
 63            block_id,
 64            execution_view,
 65        })
 66    }
 67
 68    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
 69        self.execution_view.update(cx, |execution_view, cx| {
 70            execution_view.push_message(&message.content, cx);
 71        });
 72
 73        self.editor
 74            .update(cx, |editor, cx| {
 75                let mut replacements = HashMap::default();
 76                replacements.insert(
 77                    self.block_id,
 78                    (
 79                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
 80                        Self::create_output_area_render(self.execution_view.clone()),
 81                    ),
 82                );
 83                editor.replace_blocks(replacements, None, cx);
 84            })
 85            .ok();
 86    }
 87
 88    fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
 89        let render = move |cx: &mut BlockContext| {
 90            let execution_view = execution_view.clone();
 91            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
 92            // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
 93
 94            let gutter_width = cx.gutter_dimensions.width;
 95
 96            h_flex()
 97                .w_full()
 98                .bg(cx.theme().colors().background)
 99                .border_y_1()
100                .border_color(cx.theme().colors().border)
101                .pl(gutter_width)
102                .child(
103                    div()
104                        .font_family(text_font)
105                        // .ml(gutter_width)
106                        .mx_1()
107                        .my_2()
108                        .h_full()
109                        .w_full()
110                        .mr(gutter_width)
111                        .child(execution_view),
112                )
113                .into_any_element()
114        };
115
116        Box::new(render)
117    }
118}
119
120impl Session {
121    pub fn new(
122        editor: WeakView<Editor>,
123        fs: Arc<dyn Fs>,
124        kernel_specification: KernelSpecification,
125        cx: &mut ViewContext<Self>,
126    ) -> Self {
127        let entity_id = editor.entity_id();
128        let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
129
130        let pending_kernel = cx
131            .spawn(|this, mut cx| async move {
132                let kernel = kernel.await;
133
134                match kernel {
135                    Ok((kernel, mut messages_rx)) => {
136                        this.update(&mut cx, |this, cx| {
137                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
138                            this.kernel = Kernel::RunningKernel(kernel);
139
140                            this.messaging_task = cx.spawn(|session, mut cx| async move {
141                                while let Some(message) = messages_rx.next().await {
142                                    session
143                                        .update(&mut cx, |session, cx| {
144                                            session.route(&message, cx);
145                                        })
146                                        .ok();
147                                }
148                            });
149
150                            // For some reason sending a kernel info request will brick the ark (R) kernel.
151                            // Note that Deno and Python do not have this issue.
152                            if this.kernel_specification.name == "ark" {
153                                return;
154                            }
155
156                            // Get kernel info after (possibly) letting the kernel start
157                            cx.spawn(|this, mut cx| async move {
158                                cx.background_executor()
159                                    .timer(Duration::from_millis(120))
160                                    .await;
161                                this.update(&mut cx, |this, _cx| {
162                                    this.send(KernelInfoRequest {}.into(), _cx).ok();
163                                })
164                                .ok();
165                            })
166                            .detach();
167                        })
168                        .ok();
169                    }
170                    Err(err) => {
171                        this.update(&mut cx, |this, _cx| {
172                            this.kernel = Kernel::ErroredLaunch(err.to_string());
173                        })
174                        .ok();
175                    }
176                }
177            })
178            .shared();
179
180        return Self {
181            editor,
182            kernel: Kernel::StartingKernel(pending_kernel),
183            messaging_task: Task::ready(()),
184            blocks: HashMap::default(),
185            kernel_specification,
186        };
187    }
188
189    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
190        match &mut self.kernel {
191            Kernel::RunningKernel(kernel) => {
192                kernel.request_tx.try_send(message).ok();
193            }
194            _ => {}
195        }
196
197        anyhow::Ok(())
198    }
199
200    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
201        let blocks_to_remove: HashSet<BlockId> =
202            self.blocks.values().map(|block| block.block_id).collect();
203
204        self.editor
205            .update(cx, |editor, cx| {
206                editor.remove_blocks(blocks_to_remove, None, cx);
207            })
208            .ok();
209
210        self.blocks.clear();
211    }
212
213    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
214        let editor = if let Some(editor) = self.editor.upgrade() {
215            editor
216        } else {
217            return;
218        };
219
220        let execute_request = ExecuteRequest {
221            code: code.to_string(),
222            ..ExecuteRequest::default()
223        };
224
225        let message: JupyterMessage = execute_request.into();
226
227        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
228
229        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
230
231        self.blocks.retain(|_key, block| {
232            if anchor_range.overlaps(&block.code_range, &buffer) {
233                blocks_to_remove.insert(block.block_id);
234                false
235            } else {
236                true
237            }
238        });
239
240        self.editor
241            .update(cx, |editor, cx| {
242                editor.remove_blocks(blocks_to_remove, None, cx);
243            })
244            .ok();
245
246        let status = match &self.kernel {
247            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
248            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
249            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
250            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
251            Kernel::Shutdown => ExecutionStatus::Shutdown,
252        };
253
254        let editor_block = if let Ok(editor_block) =
255            EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
256        {
257            editor_block
258        } else {
259            return;
260        };
261
262        self.blocks
263            .insert(message.header.msg_id.clone(), editor_block);
264
265        match &self.kernel {
266            Kernel::RunningKernel(_) => {
267                self.send(message, cx).ok();
268            }
269            Kernel::StartingKernel(task) => {
270                // Queue up the execution as a task to run after the kernel starts
271                let task = task.clone();
272                let message = message.clone();
273
274                cx.spawn(|this, mut cx| async move {
275                    task.await;
276                    this.update(&mut cx, |this, cx| {
277                        this.send(message, cx).ok();
278                    })
279                    .ok();
280                })
281                .detach();
282            }
283            _ => {}
284        }
285    }
286
287    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
288        let parent_message_id = match message.parent_header.as_ref() {
289            Some(header) => &header.msg_id,
290            None => return,
291        };
292
293        match &message.content {
294            JupyterMessageContent::Status(status) => {
295                self.kernel.set_execution_state(&status.execution_state);
296                cx.notify();
297            }
298            JupyterMessageContent::KernelInfoReply(reply) => {
299                self.kernel.set_kernel_info(&reply);
300                cx.notify();
301            }
302            _ => {}
303        }
304
305        if let Some(block) = self.blocks.get_mut(parent_message_id) {
306            block.handle_message(&message, cx);
307            return;
308        }
309    }
310
311    fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
312        match &mut self.kernel {
313            Kernel::RunningKernel(_kernel) => {
314                self.send(InterruptRequest {}.into(), cx).ok();
315            }
316            Kernel::StartingKernel(_task) => {
317                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
318            }
319            _ => {}
320        }
321    }
322
323    fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
324        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
325
326        match kernel {
327            Kernel::RunningKernel(mut kernel) => {
328                let mut request_tx = kernel.request_tx.clone();
329
330                cx.spawn(|this, mut cx| async move {
331                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
332                    request_tx.try_send(message).ok();
333
334                    // Give the kernel a bit of time to clean up
335                    cx.background_executor().timer(Duration::from_secs(3)).await;
336
337                    kernel.process.kill().ok();
338
339                    this.update(&mut cx, |this, cx| {
340                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
341                        this.clear_outputs(cx);
342                        this.kernel = Kernel::Shutdown;
343                        cx.notify();
344                    })
345                    .ok();
346                })
347                .detach();
348            }
349            Kernel::StartingKernel(_kernel) => {
350                self.kernel = Kernel::Shutdown;
351            }
352            _ => {
353                self.kernel = Kernel::Shutdown;
354            }
355        }
356        cx.notify();
357    }
358}
359
360pub enum SessionEvent {
361    Shutdown(WeakView<Editor>),
362}
363
364impl EventEmitter<SessionEvent> for Session {}
365
366impl Render for Session {
367    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
368        let mut buttons = vec![];
369
370        buttons.push(
371            ButtonLike::new("shutdown")
372                .child(Label::new("Shutdown"))
373                .style(ButtonStyle::Subtle)
374                .on_click(cx.listener(move |session, _, cx| {
375                    session.shutdown(cx);
376                })),
377        );
378
379        let status_text = match &self.kernel {
380            Kernel::RunningKernel(kernel) => {
381                buttons.push(
382                    ButtonLike::new("interrupt")
383                        .child(Label::new("Interrupt"))
384                        .style(ButtonStyle::Subtle)
385                        .on_click(cx.listener(move |session, _, cx| {
386                            session.interrupt(cx);
387                        })),
388                );
389                let mut name = self.kernel_specification.name.clone();
390
391                if let Some(info) = &kernel.kernel_info {
392                    name.push_str(" (");
393                    name.push_str(&info.language_info.name);
394                    name.push_str(")");
395                }
396                name
397            }
398            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
399            Kernel::ErroredLaunch(err) => {
400                format!("{} (Error: {})", self.kernel_specification.name, err)
401            }
402            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
403            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
404        };
405
406        return v_flex()
407            .gap_1()
408            .child(
409                h_flex()
410                    .gap_2()
411                    .child(self.kernel.dot())
412                    .child(Label::new(status_text)),
413            )
414            .child(h_flex().gap_2().children(buttons));
415    }
416}