session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  9    },
 10    Anchor, AnchorRangeExt as _, Editor,
 11};
 12use futures::{FutureExt as _, StreamExt as _};
 13use gpui::{div, prelude::*, Entity, EventEmitter, Render, Task, View, ViewContext};
 14use project::Fs;
 15use runtimelib::{
 16    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, KernelInfoRequest,
 17    ShutdownRequest,
 18};
 19use settings::Settings as _;
 20use std::{ops::Range, sync::Arc, time::Duration};
 21use theme::{ActiveTheme, ThemeSettings};
 22use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
 23
 24pub struct Session {
 25    editor: View<Editor>,
 26    kernel: Kernel,
 27    blocks: HashMap<String, EditorBlock>,
 28    messaging_task: Task<()>,
 29    kernel_specification: KernelSpecification,
 30}
 31
 32#[derive(Debug)]
 33struct EditorBlock {
 34    editor: View<Editor>,
 35    code_range: Range<Anchor>,
 36    block_id: BlockId,
 37    execution_view: View<ExecutionView>,
 38}
 39
 40impl EditorBlock {
 41    fn new(
 42        editor: View<Editor>,
 43        code_range: Range<Anchor>,
 44        status: ExecutionStatus,
 45        cx: &mut ViewContext<Session>,
 46    ) -> Self {
 47        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 48
 49        let block_id = editor.update(cx, |editor, cx| {
 50            let block = BlockProperties {
 51                position: code_range.end,
 52                height: execution_view.num_lines(cx).saturating_add(1),
 53                style: BlockStyle::Sticky,
 54                render: Self::create_output_area_render(execution_view.clone()),
 55                disposition: BlockDisposition::Below,
 56            };
 57
 58            editor.insert_blocks([block], None, cx)[0]
 59        });
 60
 61        Self {
 62            editor,
 63            code_range,
 64            block_id,
 65            execution_view,
 66        }
 67    }
 68
 69    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
 70        self.execution_view.update(cx, |execution_view, cx| {
 71            execution_view.push_message(&message.content, cx);
 72        });
 73
 74        self.editor.update(cx, |editor, cx| {
 75            let mut replacements = HashMap::default();
 76            replacements.insert(
 77                self.block_id,
 78                (
 79                    Some(self.execution_view.num_lines(cx).saturating_add(1)),
 80                    Self::create_output_area_render(self.execution_view.clone()),
 81                ),
 82            );
 83            editor.replace_blocks(replacements, None, cx);
 84        })
 85    }
 86
 87    fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
 88        let render = move |cx: &mut BlockContext| {
 89            let execution_view = execution_view.clone();
 90            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
 91            // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
 92
 93            let gutter_width = cx.gutter_dimensions.width;
 94
 95            h_flex()
 96                .w_full()
 97                .bg(cx.theme().colors().background)
 98                .border_y_1()
 99                .border_color(cx.theme().colors().border)
100                .pl(gutter_width)
101                .child(
102                    div()
103                        .font_family(text_font)
104                        // .ml(gutter_width)
105                        .mx_1()
106                        .my_2()
107                        .h_full()
108                        .w_full()
109                        .mr(gutter_width)
110                        .child(execution_view),
111                )
112                .into_any_element()
113        };
114
115        Box::new(render)
116    }
117}
118
119impl Session {
120    pub fn new(
121        editor: View<Editor>,
122        fs: Arc<dyn Fs>,
123        kernel_specification: KernelSpecification,
124        cx: &mut ViewContext<Self>,
125    ) -> Self {
126        let entity_id = editor.entity_id();
127        let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
128
129        let pending_kernel = cx
130            .spawn(|this, mut cx| async move {
131                let kernel = kernel.await;
132
133                match kernel {
134                    Ok((kernel, mut messages_rx)) => {
135                        this.update(&mut cx, |this, cx| {
136                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
137                            this.kernel = Kernel::RunningKernel(kernel);
138
139                            this.messaging_task = cx.spawn(|session, mut cx| async move {
140                                while let Some(message) = messages_rx.next().await {
141                                    session
142                                        .update(&mut cx, |session, cx| {
143                                            session.route(&message, cx);
144                                        })
145                                        .ok();
146                                }
147                            });
148
149                            // For some reason sending a kernel info request will brick the ark (R) kernel.
150                            // Note that Deno and Python do not have this issue.
151                            if this.kernel_specification.name == "ark" {
152                                return;
153                            }
154
155                            // Get kernel info after (possibly) letting the kernel start
156                            cx.spawn(|this, mut cx| async move {
157                                cx.background_executor()
158                                    .timer(Duration::from_millis(120))
159                                    .await;
160                                this.update(&mut cx, |this, _cx| {
161                                    this.send(KernelInfoRequest {}.into(), _cx).ok();
162                                })
163                                .ok();
164                            })
165                            .detach();
166                        })
167                        .ok();
168                    }
169                    Err(err) => {
170                        this.update(&mut cx, |this, _cx| {
171                            this.kernel = Kernel::ErroredLaunch(err.to_string());
172                        })
173                        .ok();
174                    }
175                }
176            })
177            .shared();
178
179        return Self {
180            editor,
181            kernel: Kernel::StartingKernel(pending_kernel),
182            messaging_task: Task::ready(()),
183            blocks: HashMap::default(),
184            kernel_specification,
185        };
186    }
187
188    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
189        match &mut self.kernel {
190            Kernel::RunningKernel(kernel) => {
191                kernel.request_tx.try_send(message).ok();
192            }
193            _ => {}
194        }
195
196        anyhow::Ok(())
197    }
198
199    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
200        let blocks_to_remove: HashSet<BlockId> =
201            self.blocks.values().map(|block| block.block_id).collect();
202
203        self.editor.update(cx, |editor, cx| {
204            editor.remove_blocks(blocks_to_remove, None, cx);
205        });
206
207        self.blocks.clear();
208    }
209
210    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
211        let execute_request = ExecuteRequest {
212            code: code.to_string(),
213            ..ExecuteRequest::default()
214        };
215
216        let message: JupyterMessage = execute_request.into();
217
218        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
219
220        let buffer = self.editor.read(cx).buffer().read(cx).snapshot(cx);
221
222        self.blocks.retain(|_key, block| {
223            if anchor_range.overlaps(&block.code_range, &buffer) {
224                blocks_to_remove.insert(block.block_id);
225                false
226            } else {
227                true
228            }
229        });
230
231        self.editor.update(cx, |editor, cx| {
232            editor.remove_blocks(blocks_to_remove, None, cx);
233        });
234
235        let status = match &self.kernel {
236            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
237            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
238            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
239            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
240            Kernel::Shutdown => ExecutionStatus::Shutdown,
241        };
242
243        let editor_block = EditorBlock::new(self.editor.clone(), anchor_range, status, cx);
244
245        self.blocks
246            .insert(message.header.msg_id.clone(), editor_block);
247
248        match &self.kernel {
249            Kernel::RunningKernel(_) => {
250                self.send(message, cx).ok();
251            }
252            Kernel::StartingKernel(task) => {
253                // Queue up the execution as a task to run after the kernel starts
254                let task = task.clone();
255                let message = message.clone();
256
257                cx.spawn(|this, mut cx| async move {
258                    task.await;
259                    this.update(&mut cx, |this, cx| {
260                        this.send(message, cx).ok();
261                    })
262                    .ok();
263                })
264                .detach();
265            }
266            _ => {}
267        }
268    }
269
270    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
271        let parent_message_id = match message.parent_header.as_ref() {
272            Some(header) => &header.msg_id,
273            None => return,
274        };
275
276        match &message.content {
277            JupyterMessageContent::Status(status) => {
278                self.kernel.set_execution_state(&status.execution_state);
279                cx.notify();
280            }
281            JupyterMessageContent::KernelInfoReply(reply) => {
282                self.kernel.set_kernel_info(&reply);
283                cx.notify();
284            }
285            _ => {}
286        }
287
288        if let Some(block) = self.blocks.get_mut(parent_message_id) {
289            block.handle_message(&message, cx);
290            return;
291        }
292    }
293
294    fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
295        match &mut self.kernel {
296            Kernel::RunningKernel(_kernel) => {
297                self.send(InterruptRequest {}.into(), cx).ok();
298            }
299            Kernel::StartingKernel(_task) => {
300                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
301            }
302            _ => {}
303        }
304    }
305
306    fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
307        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
308        // todo!(): emit event for the runtime panel to remove this session once in shutdown state
309
310        match kernel {
311            Kernel::RunningKernel(mut kernel) => {
312                let mut request_tx = kernel.request_tx.clone();
313
314                cx.spawn(|this, mut cx| async move {
315                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
316                    request_tx.try_send(message).ok();
317
318                    // Give the kernel a bit of time to clean up
319                    cx.background_executor().timer(Duration::from_secs(3)).await;
320
321                    kernel.process.kill().ok();
322
323                    this.update(&mut cx, |this, cx| {
324                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
325                        this.clear_outputs(cx);
326                        this.kernel = Kernel::Shutdown;
327                        cx.notify();
328                    })
329                    .ok();
330                })
331                .detach();
332            }
333            Kernel::StartingKernel(_kernel) => {
334                self.kernel = Kernel::Shutdown;
335            }
336            _ => {
337                self.kernel = Kernel::Shutdown;
338            }
339        }
340        cx.notify();
341    }
342}
343
344pub enum SessionEvent {
345    Shutdown(View<Editor>),
346}
347
348impl EventEmitter<SessionEvent> for Session {}
349
350impl Render for Session {
351    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
352        let mut buttons = vec![];
353
354        buttons.push(
355            ButtonLike::new("shutdown")
356                .child(Label::new("Shutdown"))
357                .style(ButtonStyle::Subtle)
358                .on_click(cx.listener(move |session, _, cx| {
359                    session.shutdown(cx);
360                })),
361        );
362
363        let status_text = match &self.kernel {
364            Kernel::RunningKernel(kernel) => {
365                buttons.push(
366                    ButtonLike::new("interrupt")
367                        .child(Label::new("Interrupt"))
368                        .style(ButtonStyle::Subtle)
369                        .on_click(cx.listener(move |session, _, cx| {
370                            session.interrupt(cx);
371                        })),
372                );
373                let mut name = self.kernel_specification.name.clone();
374
375                if let Some(info) = &kernel.kernel_info {
376                    name.push_str(" (");
377                    name.push_str(&info.language_info.name);
378                    name.push_str(")");
379                }
380                name
381            }
382            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
383            Kernel::ErroredLaunch(err) => {
384                format!("{} (Error: {})", self.kernel_specification.name, err)
385            }
386            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
387            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
388        };
389
390        return v_flex()
391            .gap_1()
392            .child(
393                h_flex()
394                    .gap_2()
395                    .child(self.kernel.dot())
396                    .child(Label::new(status_text)),
397            )
398            .child(h_flex().gap_2().children(buttons));
399    }
400}