session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  9    },
 10    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 11};
 12use futures::{FutureExt as _, StreamExt as _};
 13use gpui::{
 14    div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
 15};
 16use language::Point;
 17use project::Fs;
 18use runtimelib::{
 19    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
 20};
 21use settings::Settings as _;
 22use std::{env::temp_dir, ops::Range, path::PathBuf, sync::Arc, time::Duration};
 23use theme::{ActiveTheme, ThemeSettings};
 24use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
 25
 26pub struct Session {
 27    pub editor: WeakView<Editor>,
 28    pub kernel: Kernel,
 29    blocks: HashMap<String, EditorBlock>,
 30    pub messaging_task: Task<()>,
 31    pub kernel_specification: KernelSpecification,
 32    _buffer_subscription: Subscription,
 33}
 34
 35struct EditorBlock {
 36    editor: WeakView<Editor>,
 37    code_range: Range<Anchor>,
 38    invalidation_anchor: Anchor,
 39    block_id: BlockId,
 40    execution_view: View<ExecutionView>,
 41}
 42
 43impl EditorBlock {
 44    fn new(
 45        editor: WeakView<Editor>,
 46        code_range: Range<Anchor>,
 47        status: ExecutionStatus,
 48        cx: &mut ViewContext<Session>,
 49    ) -> anyhow::Result<Self> {
 50        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 51
 52        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 53            let buffer = editor.buffer().clone();
 54            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 55            let end_point = code_range.end.to_point(&buffer_snapshot);
 56            let next_row_start = end_point + Point::new(1, 0);
 57            if next_row_start > buffer_snapshot.max_point() {
 58                buffer.update(cx, |buffer, cx| {
 59                    buffer.edit(
 60                        [(
 61                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 62                            "\n",
 63                        )],
 64                        None,
 65                        cx,
 66                    )
 67                });
 68            }
 69
 70            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 71            let block = BlockProperties {
 72                position: code_range.end,
 73                height: execution_view.num_lines(cx).saturating_add(1),
 74                style: BlockStyle::Sticky,
 75                render: Self::create_output_area_render(execution_view.clone()),
 76                disposition: BlockDisposition::Below,
 77            };
 78
 79            let block_id = editor.insert_blocks([block], None, cx)[0];
 80            (block_id, invalidation_anchor)
 81        })?;
 82
 83        anyhow::Ok(Self {
 84            editor,
 85            code_range,
 86            invalidation_anchor,
 87            block_id,
 88            execution_view,
 89        })
 90    }
 91
 92    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
 93        self.execution_view.update(cx, |execution_view, cx| {
 94            execution_view.push_message(&message.content, cx);
 95        });
 96
 97        self.editor
 98            .update(cx, |editor, cx| {
 99                let mut replacements = HashMap::default();
100                replacements.insert(
101                    self.block_id,
102                    (
103                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
104                        Self::create_output_area_render(self.execution_view.clone()),
105                    ),
106                );
107                editor.replace_blocks(replacements, None, cx);
108            })
109            .ok();
110    }
111
112    fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
113        let render = move |cx: &mut BlockContext| {
114            let execution_view = execution_view.clone();
115            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
116            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
117            // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
118
119            let gutter_width = cx.gutter_dimensions.width;
120
121            h_flex()
122                .w_full()
123                .bg(cx.theme().colors().background)
124                .border_y_1()
125                .border_color(cx.theme().colors().border)
126                .pl(gutter_width)
127                .child(
128                    div()
129                        .text_size(text_font_size)
130                        .font_family(text_font)
131                        // .ml(gutter_width)
132                        .mx_1()
133                        .my_2()
134                        .h_full()
135                        .w_full()
136                        .mr(gutter_width)
137                        .child(execution_view),
138                )
139                .into_any_element()
140        };
141
142        Box::new(render)
143    }
144}
145
146impl Session {
147    pub fn working_directory(editor: WeakView<Editor>, cx: &WindowContext) -> PathBuf {
148        if let Some(working_directory) = editor
149            .upgrade()
150            .and_then(|editor| editor.read(cx).working_directory(cx))
151        {
152            working_directory
153        } else {
154            temp_dir()
155        }
156    }
157
158    pub fn new(
159        editor: WeakView<Editor>,
160        fs: Arc<dyn Fs>,
161        kernel_specification: KernelSpecification,
162        cx: &mut ViewContext<Self>,
163    ) -> Self {
164        let entity_id = editor.entity_id();
165
166        let kernel = RunningKernel::new(
167            kernel_specification.clone(),
168            entity_id,
169            Self::working_directory(editor.clone(), cx),
170            fs.clone(),
171            cx,
172        );
173
174        let pending_kernel = cx
175            .spawn(|this, mut cx| async move {
176                let kernel = kernel.await;
177
178                match kernel {
179                    Ok((kernel, mut messages_rx)) => {
180                        this.update(&mut cx, |this, cx| {
181                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
182                            this.kernel = Kernel::RunningKernel(kernel);
183
184                            this.messaging_task = cx.spawn(|session, mut cx| async move {
185                                while let Some(message) = messages_rx.next().await {
186                                    session
187                                        .update(&mut cx, |session, cx| {
188                                            session.route(&message, cx);
189                                        })
190                                        .ok();
191                                }
192                            });
193                        })
194                        .ok();
195                    }
196                    Err(err) => {
197                        this.update(&mut cx, |this, _cx| {
198                            this.kernel = Kernel::ErroredLaunch(err.to_string());
199                        })
200                        .ok();
201                    }
202                }
203            })
204            .shared();
205
206        let subscription = match editor.upgrade() {
207            Some(editor) => {
208                let buffer = editor.read(cx).buffer().clone();
209                cx.subscribe(&buffer, Self::on_buffer_event)
210            }
211            None => Subscription::new(|| {}),
212        };
213
214        return Self {
215            editor,
216            kernel: Kernel::StartingKernel(pending_kernel),
217            messaging_task: Task::ready(()),
218            blocks: HashMap::default(),
219            kernel_specification,
220            _buffer_subscription: subscription,
221        };
222    }
223
224    fn on_buffer_event(
225        &mut self,
226        buffer: Model<MultiBuffer>,
227        event: &multi_buffer::Event,
228        cx: &mut ViewContext<Self>,
229    ) {
230        if let multi_buffer::Event::Edited { .. } = event {
231            let snapshot = buffer.read(cx).snapshot(cx);
232
233            let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
234
235            self.blocks.retain(|_id, block| {
236                if block.invalidation_anchor.is_valid(&snapshot) {
237                    true
238                } else {
239                    blocks_to_remove.insert(block.block_id);
240                    false
241                }
242            });
243
244            if !blocks_to_remove.is_empty() {
245                self.editor
246                    .update(cx, |editor, cx| {
247                        editor.remove_blocks(blocks_to_remove, None, cx);
248                    })
249                    .ok();
250                cx.notify();
251            }
252        }
253    }
254
255    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
256        match &mut self.kernel {
257            Kernel::RunningKernel(kernel) => {
258                kernel.request_tx.try_send(message).ok();
259            }
260            _ => {}
261        }
262
263        anyhow::Ok(())
264    }
265
266    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
267        let blocks_to_remove: HashSet<BlockId> =
268            self.blocks.values().map(|block| block.block_id).collect();
269
270        self.editor
271            .update(cx, |editor, cx| {
272                editor.remove_blocks(blocks_to_remove, None, cx);
273            })
274            .ok();
275
276        self.blocks.clear();
277    }
278
279    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
280        let editor = if let Some(editor) = self.editor.upgrade() {
281            editor
282        } else {
283            return;
284        };
285
286        if code.is_empty() {
287            return;
288        }
289
290        let execute_request = ExecuteRequest {
291            code: code.to_string(),
292            ..ExecuteRequest::default()
293        };
294
295        let message: JupyterMessage = execute_request.into();
296
297        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
298
299        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
300
301        self.blocks.retain(|_key, block| {
302            if anchor_range.overlaps(&block.code_range, &buffer) {
303                blocks_to_remove.insert(block.block_id);
304                false
305            } else {
306                true
307            }
308        });
309
310        self.editor
311            .update(cx, |editor, cx| {
312                editor.remove_blocks(blocks_to_remove, None, cx);
313            })
314            .ok();
315
316        let status = match &self.kernel {
317            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
318            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
319            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
320            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
321            Kernel::Shutdown => ExecutionStatus::Shutdown,
322        };
323
324        let editor_block = if let Ok(editor_block) =
325            EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
326        {
327            editor_block
328        } else {
329            return;
330        };
331
332        self.blocks
333            .insert(message.header.msg_id.clone(), editor_block);
334
335        match &self.kernel {
336            Kernel::RunningKernel(_) => {
337                self.send(message, cx).ok();
338            }
339            Kernel::StartingKernel(task) => {
340                // Queue up the execution as a task to run after the kernel starts
341                let task = task.clone();
342                let message = message.clone();
343
344                cx.spawn(|this, mut cx| async move {
345                    task.await;
346                    this.update(&mut cx, |this, cx| {
347                        this.send(message, cx).ok();
348                    })
349                    .ok();
350                })
351                .detach();
352            }
353            _ => {}
354        }
355    }
356
357    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
358        let parent_message_id = match message.parent_header.as_ref() {
359            Some(header) => &header.msg_id,
360            None => return,
361        };
362
363        match &message.content {
364            JupyterMessageContent::Status(status) => {
365                self.kernel.set_execution_state(&status.execution_state);
366                cx.notify();
367            }
368            JupyterMessageContent::KernelInfoReply(reply) => {
369                self.kernel.set_kernel_info(&reply);
370                cx.notify();
371            }
372            _ => {}
373        }
374
375        if let Some(block) = self.blocks.get_mut(parent_message_id) {
376            block.handle_message(&message, cx);
377            return;
378        }
379    }
380
381    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
382        match &mut self.kernel {
383            Kernel::RunningKernel(_kernel) => {
384                self.send(InterruptRequest {}.into(), cx).ok();
385            }
386            Kernel::StartingKernel(_task) => {
387                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
388            }
389            _ => {}
390        }
391    }
392
393    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
394        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
395
396        match kernel {
397            Kernel::RunningKernel(mut kernel) => {
398                let mut request_tx = kernel.request_tx.clone();
399
400                cx.spawn(|this, mut cx| async move {
401                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
402                    request_tx.try_send(message).ok();
403
404                    // Give the kernel a bit of time to clean up
405                    cx.background_executor().timer(Duration::from_secs(3)).await;
406
407                    kernel.process.kill().ok();
408
409                    this.update(&mut cx, |this, cx| {
410                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
411                        this.clear_outputs(cx);
412                        this.kernel = Kernel::Shutdown;
413                        cx.notify();
414                    })
415                    .ok();
416                })
417                .detach();
418            }
419            Kernel::StartingKernel(_kernel) => {
420                self.kernel = Kernel::Shutdown;
421            }
422            _ => {
423                self.kernel = Kernel::Shutdown;
424            }
425        }
426        cx.notify();
427    }
428}
429
430pub enum SessionEvent {
431    Shutdown(WeakView<Editor>),
432}
433
434impl EventEmitter<SessionEvent> for Session {}
435
436impl Render for Session {
437    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
438        let mut buttons = vec![];
439
440        buttons.push(
441            ButtonLike::new("shutdown")
442                .child(Label::new("Shutdown"))
443                .style(ButtonStyle::Subtle)
444                .on_click(cx.listener(move |session, _, cx| {
445                    session.shutdown(cx);
446                })),
447        );
448
449        let status_text = match &self.kernel {
450            Kernel::RunningKernel(kernel) => {
451                buttons.push(
452                    ButtonLike::new("interrupt")
453                        .child(Label::new("Interrupt"))
454                        .style(ButtonStyle::Subtle)
455                        .on_click(cx.listener(move |session, _, cx| {
456                            session.interrupt(cx);
457                        })),
458                );
459                let mut name = self.kernel_specification.name.clone();
460
461                if let Some(info) = &kernel.kernel_info {
462                    name.push_str(" (");
463                    name.push_str(&info.language_info.name);
464                    name.push_str(")");
465                }
466                name
467            }
468            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
469            Kernel::ErroredLaunch(err) => {
470                format!("{} (Error: {})", self.kernel_specification.name, err)
471            }
472            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
473            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
474        };
475
476        return v_flex()
477            .gap_1()
478            .child(
479                h_flex()
480                    .gap_2()
481                    .child(self.kernel.dot())
482                    .child(Label::new(status_text)),
483            )
484            .child(h_flex().gap_2().children(buttons));
485    }
486}