session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  9    },
 10    scroll::Autoscroll,
 11    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 12};
 13use futures::{FutureExt as _, StreamExt as _};
 14use gpui::{
 15    div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
 16};
 17use language::Point;
 18use project::Fs;
 19use runtimelib::{
 20    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
 21};
 22use settings::Settings as _;
 23use std::{env::temp_dir, ops::Range, path::PathBuf, sync::Arc, time::Duration};
 24use theme::{ActiveTheme, ThemeSettings};
 25use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
 26
 27pub struct Session {
 28    pub editor: WeakView<Editor>,
 29    pub kernel: Kernel,
 30    blocks: HashMap<String, EditorBlock>,
 31    pub messaging_task: Task<()>,
 32    pub kernel_specification: KernelSpecification,
 33    _buffer_subscription: Subscription,
 34}
 35
 36struct EditorBlock {
 37    editor: WeakView<Editor>,
 38    code_range: Range<Anchor>,
 39    invalidation_anchor: Anchor,
 40    block_id: BlockId,
 41    execution_view: View<ExecutionView>,
 42}
 43
 44impl EditorBlock {
 45    fn new(
 46        editor: WeakView<Editor>,
 47        code_range: Range<Anchor>,
 48        status: ExecutionStatus,
 49        cx: &mut ViewContext<Session>,
 50    ) -> anyhow::Result<Self> {
 51        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 52
 53        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 54            let buffer = editor.buffer().clone();
 55            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 56            let end_point = code_range.end.to_point(&buffer_snapshot);
 57            let next_row_start = end_point + Point::new(1, 0);
 58            if next_row_start > buffer_snapshot.max_point() {
 59                buffer.update(cx, |buffer, cx| {
 60                    buffer.edit(
 61                        [(
 62                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 63                            "\n",
 64                        )],
 65                        None,
 66                        cx,
 67                    )
 68                });
 69            }
 70
 71            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 72            let block = BlockProperties {
 73                position: code_range.end,
 74                height: execution_view.num_lines(cx).saturating_add(1),
 75                style: BlockStyle::Sticky,
 76                render: Self::create_output_area_render(execution_view.clone()),
 77                disposition: BlockDisposition::Below,
 78            };
 79
 80            let block_id = editor.insert_blocks([block], None, cx)[0];
 81            (block_id, invalidation_anchor)
 82        })?;
 83
 84        anyhow::Ok(Self {
 85            editor,
 86            code_range,
 87            invalidation_anchor,
 88            block_id,
 89            execution_view,
 90        })
 91    }
 92
 93    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
 94        self.execution_view.update(cx, |execution_view, cx| {
 95            execution_view.push_message(&message.content, cx);
 96        });
 97
 98        self.editor
 99            .update(cx, |editor, cx| {
100                let mut replacements = HashMap::default();
101                replacements.insert(
102                    self.block_id,
103                    (
104                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
105                        Self::create_output_area_render(self.execution_view.clone()),
106                    ),
107                );
108                editor.replace_blocks(replacements, None, cx);
109            })
110            .ok();
111    }
112
113    fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
114        let render = move |cx: &mut BlockContext| {
115            let execution_view = execution_view.clone();
116            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
117            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
118            // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
119
120            let gutter_width = cx.gutter_dimensions.width;
121
122            h_flex()
123                .w_full()
124                .bg(cx.theme().colors().background)
125                .border_y_1()
126                .border_color(cx.theme().colors().border)
127                .pl(gutter_width)
128                .child(
129                    div()
130                        .text_size(text_font_size)
131                        .font_family(text_font)
132                        // .ml(gutter_width)
133                        .mx_1()
134                        .my_2()
135                        .h_full()
136                        .w_full()
137                        .mr(gutter_width)
138                        .child(execution_view),
139                )
140                .into_any_element()
141        };
142
143        Box::new(render)
144    }
145}
146
147impl Session {
148    pub fn working_directory(editor: WeakView<Editor>, cx: &WindowContext) -> PathBuf {
149        if let Some(working_directory) = editor
150            .upgrade()
151            .and_then(|editor| editor.read(cx).working_directory(cx))
152        {
153            working_directory
154        } else {
155            temp_dir()
156        }
157    }
158
159    pub fn new(
160        editor: WeakView<Editor>,
161        fs: Arc<dyn Fs>,
162        kernel_specification: KernelSpecification,
163        cx: &mut ViewContext<Self>,
164    ) -> Self {
165        let entity_id = editor.entity_id();
166
167        let kernel = RunningKernel::new(
168            kernel_specification.clone(),
169            entity_id,
170            Self::working_directory(editor.clone(), cx),
171            fs.clone(),
172            cx,
173        );
174
175        let pending_kernel = cx
176            .spawn(|this, mut cx| async move {
177                let kernel = kernel.await;
178
179                match kernel {
180                    Ok((mut kernel, mut messages_rx)) => {
181                        this.update(&mut cx, |this, cx| {
182                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
183
184                            let status = kernel.process.status();
185                            this.kernel = Kernel::RunningKernel(kernel);
186
187                            cx.spawn(|session, mut cx| async move {
188                                let error_message = match status.await {
189                                    Ok(status) => {
190                                        if status.success() {
191                                            log::info!("kernel process exited successfully");
192                                            return;
193                                        }
194
195                                        format!("kernel process exited with status: {:?}", status)
196                                    }
197                                    Err(err) => {
198                                        format!("kernel process exited with error: {:?}", err)
199                                    }
200                                };
201
202                                log::error!("{}", error_message);
203
204                                session
205                                    .update(&mut cx, |session, cx| {
206                                        session.kernel =
207                                            Kernel::ErroredLaunch(error_message.clone());
208
209                                        session.blocks.values().for_each(|block| {
210                                            block.execution_view.update(
211                                                cx,
212                                                |execution_view, cx| {
213                                                    match execution_view.status {
214                                                        ExecutionStatus::Finished => {
215                                                            // Do nothing when the output was good
216                                                        }
217                                                        _ => {
218                                                            // All other cases, set the status to errored
219                                                            execution_view.status =
220                                                                ExecutionStatus::KernelErrored(
221                                                                    error_message.clone(),
222                                                                )
223                                                        }
224                                                    }
225                                                    cx.notify();
226                                                },
227                                            );
228                                        });
229
230                                        cx.notify();
231                                    })
232                                    .ok();
233                            })
234                            .detach();
235
236                            this.messaging_task = cx.spawn(|session, mut cx| async move {
237                                while let Some(message) = messages_rx.next().await {
238                                    session
239                                        .update(&mut cx, |session, cx| {
240                                            session.route(&message, cx);
241                                        })
242                                        .ok();
243                                }
244                            });
245                        })
246                        .ok();
247                    }
248                    Err(err) => {
249                        this.update(&mut cx, |this, _cx| {
250                            this.kernel = Kernel::ErroredLaunch(err.to_string());
251                        })
252                        .ok();
253                    }
254                }
255            })
256            .shared();
257
258        let subscription = match editor.upgrade() {
259            Some(editor) => {
260                let buffer = editor.read(cx).buffer().clone();
261                cx.subscribe(&buffer, Self::on_buffer_event)
262            }
263            None => Subscription::new(|| {}),
264        };
265
266        return Self {
267            editor,
268            kernel: Kernel::StartingKernel(pending_kernel),
269            messaging_task: Task::ready(()),
270            blocks: HashMap::default(),
271            kernel_specification,
272            _buffer_subscription: subscription,
273        };
274    }
275
276    fn on_buffer_event(
277        &mut self,
278        buffer: Model<MultiBuffer>,
279        event: &multi_buffer::Event,
280        cx: &mut ViewContext<Self>,
281    ) {
282        if let multi_buffer::Event::Edited { .. } = event {
283            let snapshot = buffer.read(cx).snapshot(cx);
284
285            let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
286
287            self.blocks.retain(|_id, block| {
288                if block.invalidation_anchor.is_valid(&snapshot) {
289                    true
290                } else {
291                    blocks_to_remove.insert(block.block_id);
292                    false
293                }
294            });
295
296            if !blocks_to_remove.is_empty() {
297                self.editor
298                    .update(cx, |editor, cx| {
299                        editor.remove_blocks(blocks_to_remove, None, cx);
300                    })
301                    .ok();
302                cx.notify();
303            }
304        }
305    }
306
307    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
308        match &mut self.kernel {
309            Kernel::RunningKernel(kernel) => {
310                kernel.request_tx.try_send(message).ok();
311            }
312            _ => {}
313        }
314
315        anyhow::Ok(())
316    }
317
318    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
319        let blocks_to_remove: HashSet<BlockId> =
320            self.blocks.values().map(|block| block.block_id).collect();
321
322        self.editor
323            .update(cx, |editor, cx| {
324                editor.remove_blocks(blocks_to_remove, None, cx);
325            })
326            .ok();
327
328        self.blocks.clear();
329    }
330
331    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
332        let editor = if let Some(editor) = self.editor.upgrade() {
333            editor
334        } else {
335            return;
336        };
337
338        if code.is_empty() {
339            return;
340        }
341
342        let execute_request = ExecuteRequest {
343            code: code.to_string(),
344            ..ExecuteRequest::default()
345        };
346
347        let message: JupyterMessage = execute_request.into();
348
349        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
350
351        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
352
353        self.blocks.retain(|_key, block| {
354            if anchor_range.overlaps(&block.code_range, &buffer) {
355                blocks_to_remove.insert(block.block_id);
356                false
357            } else {
358                true
359            }
360        });
361
362        self.editor
363            .update(cx, |editor, cx| {
364                editor.remove_blocks(blocks_to_remove, None, cx);
365            })
366            .ok();
367
368        let status = match &self.kernel {
369            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
370            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
371            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
372            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
373            Kernel::Shutdown => ExecutionStatus::Shutdown,
374        };
375
376        let editor_block = if let Ok(editor_block) =
377            EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
378        {
379            editor_block
380        } else {
381            return;
382        };
383
384        let new_cursor_pos = editor_block.invalidation_anchor;
385
386        self.blocks
387            .insert(message.header.msg_id.clone(), editor_block);
388
389        match &self.kernel {
390            Kernel::RunningKernel(_) => {
391                self.send(message, cx).ok();
392            }
393            Kernel::StartingKernel(task) => {
394                // Queue up the execution as a task to run after the kernel starts
395                let task = task.clone();
396                let message = message.clone();
397
398                cx.spawn(|this, mut cx| async move {
399                    task.await;
400                    this.update(&mut cx, |this, cx| {
401                        this.send(message, cx).ok();
402                    })
403                    .ok();
404                })
405                .detach();
406            }
407            _ => {}
408        }
409
410        // Now move the cursor to after the block
411        editor.update(cx, move |editor, cx| {
412            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
413                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
414            });
415        });
416    }
417
418    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
419        let parent_message_id = match message.parent_header.as_ref() {
420            Some(header) => &header.msg_id,
421            None => return,
422        };
423
424        match &message.content {
425            JupyterMessageContent::Status(status) => {
426                self.kernel.set_execution_state(&status.execution_state);
427                cx.notify();
428            }
429            JupyterMessageContent::KernelInfoReply(reply) => {
430                self.kernel.set_kernel_info(&reply);
431                cx.notify();
432            }
433            _ => {}
434        }
435
436        if let Some(block) = self.blocks.get_mut(parent_message_id) {
437            block.handle_message(&message, cx);
438            return;
439        }
440    }
441
442    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
443        match &mut self.kernel {
444            Kernel::RunningKernel(_kernel) => {
445                self.send(InterruptRequest {}.into(), cx).ok();
446            }
447            Kernel::StartingKernel(_task) => {
448                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
449            }
450            _ => {}
451        }
452    }
453
454    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
455        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
456
457        match kernel {
458            Kernel::RunningKernel(mut kernel) => {
459                let mut request_tx = kernel.request_tx.clone();
460
461                cx.spawn(|this, mut cx| async move {
462                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
463                    request_tx.try_send(message).ok();
464
465                    // Give the kernel a bit of time to clean up
466                    cx.background_executor().timer(Duration::from_secs(3)).await;
467
468                    kernel.process.kill().ok();
469
470                    this.update(&mut cx, |this, cx| {
471                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
472                        this.clear_outputs(cx);
473                        this.kernel = Kernel::Shutdown;
474                        cx.notify();
475                    })
476                    .ok();
477                })
478                .detach();
479            }
480            Kernel::StartingKernel(_kernel) => {
481                self.kernel = Kernel::Shutdown;
482            }
483            _ => {
484                self.kernel = Kernel::Shutdown;
485            }
486        }
487        cx.notify();
488    }
489}
490
491pub enum SessionEvent {
492    Shutdown(WeakView<Editor>),
493}
494
495impl EventEmitter<SessionEvent> for Session {}
496
497impl Render for Session {
498    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
499        let mut buttons = vec![];
500
501        buttons.push(
502            ButtonLike::new("shutdown")
503                .child(Label::new("Shutdown"))
504                .style(ButtonStyle::Subtle)
505                .on_click(cx.listener(move |session, _, cx| {
506                    session.shutdown(cx);
507                })),
508        );
509
510        let status_text = match &self.kernel {
511            Kernel::RunningKernel(kernel) => {
512                buttons.push(
513                    ButtonLike::new("interrupt")
514                        .child(Label::new("Interrupt"))
515                        .style(ButtonStyle::Subtle)
516                        .on_click(cx.listener(move |session, _, cx| {
517                            session.interrupt(cx);
518                        })),
519                );
520                let mut name = self.kernel_specification.name.clone();
521
522                if let Some(info) = &kernel.kernel_info {
523                    name.push_str(" (");
524                    name.push_str(&info.language_info.name);
525                    name.push_str(")");
526                }
527                name
528            }
529            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
530            Kernel::ErroredLaunch(err) => {
531                format!("{} (Error: {})", self.kernel_specification.name, err)
532            }
533            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
534            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
535        };
536
537        return v_flex()
538            .gap_1()
539            .child(
540                h_flex()
541                    .gap_2()
542                    .child(self.kernel.dot())
543                    .child(Label::new(status_text)),
544            )
545            .child(h_flex().gap_2().children(buttons));
546    }
547}