session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  9    },
 10    Anchor, AnchorRangeExt as _, Editor,
 11};
 12use futures::{FutureExt as _, StreamExt as _};
 13use gpui::{div, prelude::*, EventEmitter, Render, Task, View, ViewContext, WeakView};
 14use project::Fs;
 15use runtimelib::{
 16    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, KernelInfoRequest,
 17    ShutdownRequest,
 18};
 19use settings::Settings as _;
 20use std::{ops::Range, sync::Arc, time::Duration};
 21use theme::{ActiveTheme, ThemeSettings};
 22use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
 23
 24pub struct Session {
 25    editor: WeakView<Editor>,
 26    kernel: Kernel,
 27    blocks: HashMap<String, EditorBlock>,
 28    messaging_task: Task<()>,
 29    kernel_specification: KernelSpecification,
 30}
 31
 32struct EditorBlock {
 33    editor: WeakView<Editor>,
 34    code_range: Range<Anchor>,
 35    block_id: BlockId,
 36    execution_view: View<ExecutionView>,
 37}
 38
 39impl EditorBlock {
 40    fn new(
 41        editor: WeakView<Editor>,
 42        code_range: Range<Anchor>,
 43        status: ExecutionStatus,
 44        cx: &mut ViewContext<Session>,
 45    ) -> anyhow::Result<Self> {
 46        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 47
 48        let block_id = editor.update(cx, |editor, cx| {
 49            let block = BlockProperties {
 50                position: code_range.end,
 51                height: execution_view.num_lines(cx).saturating_add(1),
 52                style: BlockStyle::Sticky,
 53                render: Self::create_output_area_render(execution_view.clone()),
 54                disposition: BlockDisposition::Below,
 55            };
 56
 57            editor.insert_blocks([block], None, cx)[0]
 58        })?;
 59
 60        anyhow::Ok(Self {
 61            editor,
 62            code_range,
 63            block_id,
 64            execution_view,
 65        })
 66    }
 67
 68    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
 69        self.execution_view.update(cx, |execution_view, cx| {
 70            execution_view.push_message(&message.content, cx);
 71        });
 72
 73        self.editor
 74            .update(cx, |editor, cx| {
 75                let mut replacements = HashMap::default();
 76                replacements.insert(
 77                    self.block_id,
 78                    (
 79                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
 80                        Self::create_output_area_render(self.execution_view.clone()),
 81                    ),
 82                );
 83                editor.replace_blocks(replacements, None, cx);
 84            })
 85            .ok();
 86    }
 87
 88    fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
 89        let render = move |cx: &mut BlockContext| {
 90            let execution_view = execution_view.clone();
 91            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
 92            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
 93            // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
 94
 95            let gutter_width = cx.gutter_dimensions.width;
 96
 97            h_flex()
 98                .w_full()
 99                .bg(cx.theme().colors().background)
100                .border_y_1()
101                .border_color(cx.theme().colors().border)
102                .pl(gutter_width)
103                .child(
104                    div()
105                        .text_size(text_font_size)
106                        .font_family(text_font)
107                        // .ml(gutter_width)
108                        .mx_1()
109                        .my_2()
110                        .h_full()
111                        .w_full()
112                        .mr(gutter_width)
113                        .child(execution_view),
114                )
115                .into_any_element()
116        };
117
118        Box::new(render)
119    }
120}
121
122impl Session {
123    pub fn new(
124        editor: WeakView<Editor>,
125        fs: Arc<dyn Fs>,
126        kernel_specification: KernelSpecification,
127        cx: &mut ViewContext<Self>,
128    ) -> Self {
129        let entity_id = editor.entity_id();
130        let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
131
132        let pending_kernel = cx
133            .spawn(|this, mut cx| async move {
134                let kernel = kernel.await;
135
136                match kernel {
137                    Ok((kernel, mut messages_rx)) => {
138                        this.update(&mut cx, |this, cx| {
139                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
140                            this.kernel = Kernel::RunningKernel(kernel);
141
142                            this.messaging_task = cx.spawn(|session, mut cx| async move {
143                                while let Some(message) = messages_rx.next().await {
144                                    session
145                                        .update(&mut cx, |session, cx| {
146                                            session.route(&message, cx);
147                                        })
148                                        .ok();
149                                }
150                            });
151
152                            // For some reason sending a kernel info request will brick the ark (R) kernel.
153                            // Note that Deno and Python do not have this issue.
154                            if this.kernel_specification.name == "ark" {
155                                return;
156                            }
157
158                            // Get kernel info after (possibly) letting the kernel start
159                            cx.spawn(|this, mut cx| async move {
160                                cx.background_executor()
161                                    .timer(Duration::from_millis(120))
162                                    .await;
163                                this.update(&mut cx, |this, _cx| {
164                                    this.send(KernelInfoRequest {}.into(), _cx).ok();
165                                })
166                                .ok();
167                            })
168                            .detach();
169                        })
170                        .ok();
171                    }
172                    Err(err) => {
173                        this.update(&mut cx, |this, _cx| {
174                            this.kernel = Kernel::ErroredLaunch(err.to_string());
175                        })
176                        .ok();
177                    }
178                }
179            })
180            .shared();
181
182        return Self {
183            editor,
184            kernel: Kernel::StartingKernel(pending_kernel),
185            messaging_task: Task::ready(()),
186            blocks: HashMap::default(),
187            kernel_specification,
188        };
189    }
190
191    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
192        match &mut self.kernel {
193            Kernel::RunningKernel(kernel) => {
194                kernel.request_tx.try_send(message).ok();
195            }
196            _ => {}
197        }
198
199        anyhow::Ok(())
200    }
201
202    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
203        let blocks_to_remove: HashSet<BlockId> =
204            self.blocks.values().map(|block| block.block_id).collect();
205
206        self.editor
207            .update(cx, |editor, cx| {
208                editor.remove_blocks(blocks_to_remove, None, cx);
209            })
210            .ok();
211
212        self.blocks.clear();
213    }
214
215    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
216        let editor = if let Some(editor) = self.editor.upgrade() {
217            editor
218        } else {
219            return;
220        };
221
222        let execute_request = ExecuteRequest {
223            code: code.to_string(),
224            ..ExecuteRequest::default()
225        };
226
227        let message: JupyterMessage = execute_request.into();
228
229        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
230
231        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
232
233        self.blocks.retain(|_key, block| {
234            if anchor_range.overlaps(&block.code_range, &buffer) {
235                blocks_to_remove.insert(block.block_id);
236                false
237            } else {
238                true
239            }
240        });
241
242        self.editor
243            .update(cx, |editor, cx| {
244                editor.remove_blocks(blocks_to_remove, None, cx);
245            })
246            .ok();
247
248        let status = match &self.kernel {
249            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
250            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
251            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
252            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
253            Kernel::Shutdown => ExecutionStatus::Shutdown,
254        };
255
256        let editor_block = if let Ok(editor_block) =
257            EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
258        {
259            editor_block
260        } else {
261            return;
262        };
263
264        self.blocks
265            .insert(message.header.msg_id.clone(), editor_block);
266
267        match &self.kernel {
268            Kernel::RunningKernel(_) => {
269                self.send(message, cx).ok();
270            }
271            Kernel::StartingKernel(task) => {
272                // Queue up the execution as a task to run after the kernel starts
273                let task = task.clone();
274                let message = message.clone();
275
276                cx.spawn(|this, mut cx| async move {
277                    task.await;
278                    this.update(&mut cx, |this, cx| {
279                        this.send(message, cx).ok();
280                    })
281                    .ok();
282                })
283                .detach();
284            }
285            _ => {}
286        }
287    }
288
289    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
290        let parent_message_id = match message.parent_header.as_ref() {
291            Some(header) => &header.msg_id,
292            None => return,
293        };
294
295        match &message.content {
296            JupyterMessageContent::Status(status) => {
297                self.kernel.set_execution_state(&status.execution_state);
298                cx.notify();
299            }
300            JupyterMessageContent::KernelInfoReply(reply) => {
301                self.kernel.set_kernel_info(&reply);
302                cx.notify();
303            }
304            _ => {}
305        }
306
307        if let Some(block) = self.blocks.get_mut(parent_message_id) {
308            block.handle_message(&message, cx);
309            return;
310        }
311    }
312
313    fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
314        match &mut self.kernel {
315            Kernel::RunningKernel(_kernel) => {
316                self.send(InterruptRequest {}.into(), cx).ok();
317            }
318            Kernel::StartingKernel(_task) => {
319                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
320            }
321            _ => {}
322        }
323    }
324
325    fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
326        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
327
328        match kernel {
329            Kernel::RunningKernel(mut kernel) => {
330                let mut request_tx = kernel.request_tx.clone();
331
332                cx.spawn(|this, mut cx| async move {
333                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
334                    request_tx.try_send(message).ok();
335
336                    // Give the kernel a bit of time to clean up
337                    cx.background_executor().timer(Duration::from_secs(3)).await;
338
339                    kernel.process.kill().ok();
340
341                    this.update(&mut cx, |this, cx| {
342                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
343                        this.clear_outputs(cx);
344                        this.kernel = Kernel::Shutdown;
345                        cx.notify();
346                    })
347                    .ok();
348                })
349                .detach();
350            }
351            Kernel::StartingKernel(_kernel) => {
352                self.kernel = Kernel::Shutdown;
353            }
354            _ => {
355                self.kernel = Kernel::Shutdown;
356            }
357        }
358        cx.notify();
359    }
360}
361
362pub enum SessionEvent {
363    Shutdown(WeakView<Editor>),
364}
365
366impl EventEmitter<SessionEvent> for Session {}
367
368impl Render for Session {
369    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
370        let mut buttons = vec![];
371
372        buttons.push(
373            ButtonLike::new("shutdown")
374                .child(Label::new("Shutdown"))
375                .style(ButtonStyle::Subtle)
376                .on_click(cx.listener(move |session, _, cx| {
377                    session.shutdown(cx);
378                })),
379        );
380
381        let status_text = match &self.kernel {
382            Kernel::RunningKernel(kernel) => {
383                buttons.push(
384                    ButtonLike::new("interrupt")
385                        .child(Label::new("Interrupt"))
386                        .style(ButtonStyle::Subtle)
387                        .on_click(cx.listener(move |session, _, cx| {
388                            session.interrupt(cx);
389                        })),
390                );
391                let mut name = self.kernel_specification.name.clone();
392
393                if let Some(info) = &kernel.kernel_info {
394                    name.push_str(" (");
395                    name.push_str(&info.language_info.name);
396                    name.push_str(")");
397                }
398                name
399            }
400            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
401            Kernel::ErroredLaunch(err) => {
402                format!("{} (Error: {})", self.kernel_specification.name, err)
403            }
404            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
405            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
406        };
407
408        return v_flex()
409            .gap_1()
410            .child(
411                h_flex()
412                    .gap_2()
413                    .child(self.kernel.dot())
414                    .child(Label::new(status_text)),
415            )
416            .child(h_flex().gap_2().children(buttons));
417    }
418}