session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  9    },
 10    scroll::Autoscroll,
 11    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 12};
 13use futures::{FutureExt as _, StreamExt as _};
 14use gpui::{
 15    div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
 16};
 17use language::Point;
 18use project::Fs;
 19use runtimelib::{
 20    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
 21};
 22use settings::Settings as _;
 23use std::{env::temp_dir, ops::Range, path::PathBuf, sync::Arc, time::Duration};
 24use theme::{ActiveTheme, ThemeSettings};
 25use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
 26
 27pub struct Session {
 28    pub editor: WeakView<Editor>,
 29    pub kernel: Kernel,
 30    blocks: HashMap<String, EditorBlock>,
 31    pub messaging_task: Task<()>,
 32    pub kernel_specification: KernelSpecification,
 33    _buffer_subscription: Subscription,
 34}
 35
 36struct EditorBlock {
 37    editor: WeakView<Editor>,
 38    code_range: Range<Anchor>,
 39    invalidation_anchor: Anchor,
 40    block_id: BlockId,
 41    execution_view: View<ExecutionView>,
 42}
 43
 44impl EditorBlock {
 45    fn new(
 46        editor: WeakView<Editor>,
 47        code_range: Range<Anchor>,
 48        status: ExecutionStatus,
 49        cx: &mut ViewContext<Session>,
 50    ) -> anyhow::Result<Self> {
 51        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 52
 53        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 54            let buffer = editor.buffer().clone();
 55            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 56            let end_point = code_range.end.to_point(&buffer_snapshot);
 57            let next_row_start = end_point + Point::new(1, 0);
 58            if next_row_start > buffer_snapshot.max_point() {
 59                buffer.update(cx, |buffer, cx| {
 60                    buffer.edit(
 61                        [(
 62                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 63                            "\n",
 64                        )],
 65                        None,
 66                        cx,
 67                    )
 68                });
 69            }
 70
 71            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 72            let block = BlockProperties {
 73                position: code_range.end,
 74                height: execution_view.num_lines(cx).saturating_add(1),
 75                style: BlockStyle::Sticky,
 76                render: Self::create_output_area_render(execution_view.clone()),
 77                disposition: BlockDisposition::Below,
 78            };
 79
 80            let block_id = editor.insert_blocks([block], None, cx)[0];
 81            (block_id, invalidation_anchor)
 82        })?;
 83
 84        anyhow::Ok(Self {
 85            editor,
 86            code_range,
 87            invalidation_anchor,
 88            block_id,
 89            execution_view,
 90        })
 91    }
 92
 93    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
 94        self.execution_view.update(cx, |execution_view, cx| {
 95            execution_view.push_message(&message.content, cx);
 96        });
 97
 98        self.editor
 99            .update(cx, |editor, cx| {
100                let mut replacements = HashMap::default();
101                replacements.insert(
102                    self.block_id,
103                    (
104                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
105                        Self::create_output_area_render(self.execution_view.clone()),
106                    ),
107                );
108                editor.replace_blocks(replacements, None, cx);
109            })
110            .ok();
111    }
112
113    fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
114        let render = move |cx: &mut BlockContext| {
115            let execution_view = execution_view.clone();
116            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
117            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
118            // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
119
120            let gutter_width = cx.gutter_dimensions.width;
121
122            h_flex()
123                .w_full()
124                .bg(cx.theme().colors().background)
125                .border_y_1()
126                .border_color(cx.theme().colors().border)
127                .pl(gutter_width)
128                .child(
129                    div()
130                        .text_size(text_font_size)
131                        .font_family(text_font)
132                        // .ml(gutter_width)
133                        .mx_1()
134                        .my_2()
135                        .h_full()
136                        .w_full()
137                        .mr(gutter_width)
138                        .child(execution_view),
139                )
140                .into_any_element()
141        };
142
143        Box::new(render)
144    }
145}
146
147impl Session {
148    pub fn working_directory(editor: WeakView<Editor>, cx: &WindowContext) -> PathBuf {
149        if let Some(working_directory) = editor
150            .upgrade()
151            .and_then(|editor| editor.read(cx).working_directory(cx))
152        {
153            working_directory
154        } else {
155            temp_dir()
156        }
157    }
158
159    pub fn new(
160        editor: WeakView<Editor>,
161        fs: Arc<dyn Fs>,
162        kernel_specification: KernelSpecification,
163        cx: &mut ViewContext<Self>,
164    ) -> Self {
165        let entity_id = editor.entity_id();
166
167        let kernel = RunningKernel::new(
168            kernel_specification.clone(),
169            entity_id,
170            Self::working_directory(editor.clone(), cx),
171            fs.clone(),
172            cx,
173        );
174
175        let pending_kernel = cx
176            .spawn(|this, mut cx| async move {
177                let kernel = kernel.await;
178
179                match kernel {
180                    Ok((kernel, mut messages_rx)) => {
181                        this.update(&mut cx, |this, cx| {
182                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
183                            this.kernel = Kernel::RunningKernel(kernel);
184
185                            this.messaging_task = cx.spawn(|session, mut cx| async move {
186                                while let Some(message) = messages_rx.next().await {
187                                    session
188                                        .update(&mut cx, |session, cx| {
189                                            session.route(&message, cx);
190                                        })
191                                        .ok();
192                                }
193                            });
194                        })
195                        .ok();
196                    }
197                    Err(err) => {
198                        this.update(&mut cx, |this, _cx| {
199                            this.kernel = Kernel::ErroredLaunch(err.to_string());
200                        })
201                        .ok();
202                    }
203                }
204            })
205            .shared();
206
207        let subscription = match editor.upgrade() {
208            Some(editor) => {
209                let buffer = editor.read(cx).buffer().clone();
210                cx.subscribe(&buffer, Self::on_buffer_event)
211            }
212            None => Subscription::new(|| {}),
213        };
214
215        return Self {
216            editor,
217            kernel: Kernel::StartingKernel(pending_kernel),
218            messaging_task: Task::ready(()),
219            blocks: HashMap::default(),
220            kernel_specification,
221            _buffer_subscription: subscription,
222        };
223    }
224
225    fn on_buffer_event(
226        &mut self,
227        buffer: Model<MultiBuffer>,
228        event: &multi_buffer::Event,
229        cx: &mut ViewContext<Self>,
230    ) {
231        if let multi_buffer::Event::Edited { .. } = event {
232            let snapshot = buffer.read(cx).snapshot(cx);
233
234            let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
235
236            self.blocks.retain(|_id, block| {
237                if block.invalidation_anchor.is_valid(&snapshot) {
238                    true
239                } else {
240                    blocks_to_remove.insert(block.block_id);
241                    false
242                }
243            });
244
245            if !blocks_to_remove.is_empty() {
246                self.editor
247                    .update(cx, |editor, cx| {
248                        editor.remove_blocks(blocks_to_remove, None, cx);
249                    })
250                    .ok();
251                cx.notify();
252            }
253        }
254    }
255
256    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
257        match &mut self.kernel {
258            Kernel::RunningKernel(kernel) => {
259                kernel.request_tx.try_send(message).ok();
260            }
261            _ => {}
262        }
263
264        anyhow::Ok(())
265    }
266
267    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
268        let blocks_to_remove: HashSet<BlockId> =
269            self.blocks.values().map(|block| block.block_id).collect();
270
271        self.editor
272            .update(cx, |editor, cx| {
273                editor.remove_blocks(blocks_to_remove, None, cx);
274            })
275            .ok();
276
277        self.blocks.clear();
278    }
279
280    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
281        let editor = if let Some(editor) = self.editor.upgrade() {
282            editor
283        } else {
284            return;
285        };
286
287        if code.is_empty() {
288            return;
289        }
290
291        let execute_request = ExecuteRequest {
292            code: code.to_string(),
293            ..ExecuteRequest::default()
294        };
295
296        let message: JupyterMessage = execute_request.into();
297
298        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
299
300        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
301
302        self.blocks.retain(|_key, block| {
303            if anchor_range.overlaps(&block.code_range, &buffer) {
304                blocks_to_remove.insert(block.block_id);
305                false
306            } else {
307                true
308            }
309        });
310
311        self.editor
312            .update(cx, |editor, cx| {
313                editor.remove_blocks(blocks_to_remove, None, cx);
314            })
315            .ok();
316
317        let status = match &self.kernel {
318            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
319            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
320            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
321            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
322            Kernel::Shutdown => ExecutionStatus::Shutdown,
323        };
324
325        let editor_block = if let Ok(editor_block) =
326            EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
327        {
328            editor_block
329        } else {
330            return;
331        };
332
333        let new_cursor_pos = editor_block.invalidation_anchor;
334
335        self.blocks
336            .insert(message.header.msg_id.clone(), editor_block);
337
338        match &self.kernel {
339            Kernel::RunningKernel(_) => {
340                self.send(message, cx).ok();
341            }
342            Kernel::StartingKernel(task) => {
343                // Queue up the execution as a task to run after the kernel starts
344                let task = task.clone();
345                let message = message.clone();
346
347                cx.spawn(|this, mut cx| async move {
348                    task.await;
349                    this.update(&mut cx, |this, cx| {
350                        this.send(message, cx).ok();
351                    })
352                    .ok();
353                })
354                .detach();
355            }
356            _ => {}
357        }
358
359        // Now move the cursor to after the block
360        editor.update(cx, move |editor, cx| {
361            editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
362                selections.select_ranges([new_cursor_pos..new_cursor_pos]);
363            });
364        });
365    }
366
367    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
368        let parent_message_id = match message.parent_header.as_ref() {
369            Some(header) => &header.msg_id,
370            None => return,
371        };
372
373        match &message.content {
374            JupyterMessageContent::Status(status) => {
375                self.kernel.set_execution_state(&status.execution_state);
376                cx.notify();
377            }
378            JupyterMessageContent::KernelInfoReply(reply) => {
379                self.kernel.set_kernel_info(&reply);
380                cx.notify();
381            }
382            _ => {}
383        }
384
385        if let Some(block) = self.blocks.get_mut(parent_message_id) {
386            block.handle_message(&message, cx);
387            return;
388        }
389    }
390
391    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
392        match &mut self.kernel {
393            Kernel::RunningKernel(_kernel) => {
394                self.send(InterruptRequest {}.into(), cx).ok();
395            }
396            Kernel::StartingKernel(_task) => {
397                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
398            }
399            _ => {}
400        }
401    }
402
403    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
404        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
405
406        match kernel {
407            Kernel::RunningKernel(mut kernel) => {
408                let mut request_tx = kernel.request_tx.clone();
409
410                cx.spawn(|this, mut cx| async move {
411                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
412                    request_tx.try_send(message).ok();
413
414                    // Give the kernel a bit of time to clean up
415                    cx.background_executor().timer(Duration::from_secs(3)).await;
416
417                    kernel.process.kill().ok();
418
419                    this.update(&mut cx, |this, cx| {
420                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
421                        this.clear_outputs(cx);
422                        this.kernel = Kernel::Shutdown;
423                        cx.notify();
424                    })
425                    .ok();
426                })
427                .detach();
428            }
429            Kernel::StartingKernel(_kernel) => {
430                self.kernel = Kernel::Shutdown;
431            }
432            _ => {
433                self.kernel = Kernel::Shutdown;
434            }
435        }
436        cx.notify();
437    }
438}
439
440pub enum SessionEvent {
441    Shutdown(WeakView<Editor>),
442}
443
444impl EventEmitter<SessionEvent> for Session {}
445
446impl Render for Session {
447    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
448        let mut buttons = vec![];
449
450        buttons.push(
451            ButtonLike::new("shutdown")
452                .child(Label::new("Shutdown"))
453                .style(ButtonStyle::Subtle)
454                .on_click(cx.listener(move |session, _, cx| {
455                    session.shutdown(cx);
456                })),
457        );
458
459        let status_text = match &self.kernel {
460            Kernel::RunningKernel(kernel) => {
461                buttons.push(
462                    ButtonLike::new("interrupt")
463                        .child(Label::new("Interrupt"))
464                        .style(ButtonStyle::Subtle)
465                        .on_click(cx.listener(move |session, _, cx| {
466                            session.interrupt(cx);
467                        })),
468                );
469                let mut name = self.kernel_specification.name.clone();
470
471                if let Some(info) = &kernel.kernel_info {
472                    name.push_str(" (");
473                    name.push_str(&info.language_info.name);
474                    name.push_str(")");
475                }
476                name
477            }
478            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
479            Kernel::ErroredLaunch(err) => {
480                format!("{} (Error: {})", self.kernel_specification.name, err)
481            }
482            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
483            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
484        };
485
486        return v_flex()
487            .gap_1()
488            .child(
489                h_flex()
490                    .gap_2()
491                    .child(self.kernel.dot())
492                    .child(Label::new(status_text)),
493            )
494            .child(h_flex().gap_2().children(buttons));
495    }
496}