session.rs

  1use crate::{
  2    kernels::{Kernel, KernelSpecification, RunningKernel},
  3    outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
  4};
  5use collections::{HashMap, HashSet};
  6use editor::{
  7    display_map::{
  8        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  9    },
 10    Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
 11};
 12use futures::{FutureExt as _, StreamExt as _};
 13use gpui::{
 14    div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
 15};
 16use language::Point;
 17use project::Fs;
 18use runtimelib::{
 19    ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
 20};
 21use settings::Settings as _;
 22use std::{ops::Range, sync::Arc, time::Duration};
 23use theme::{ActiveTheme, ThemeSettings};
 24use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
 25
 26pub struct Session {
 27    pub editor: WeakView<Editor>,
 28    pub kernel: Kernel,
 29    blocks: HashMap<String, EditorBlock>,
 30    pub messaging_task: Task<()>,
 31    pub kernel_specification: KernelSpecification,
 32    _buffer_subscription: Subscription,
 33}
 34
 35struct EditorBlock {
 36    editor: WeakView<Editor>,
 37    code_range: Range<Anchor>,
 38    invalidation_anchor: Anchor,
 39    block_id: BlockId,
 40    execution_view: View<ExecutionView>,
 41}
 42
 43impl EditorBlock {
 44    fn new(
 45        editor: WeakView<Editor>,
 46        code_range: Range<Anchor>,
 47        status: ExecutionStatus,
 48        cx: &mut ViewContext<Session>,
 49    ) -> anyhow::Result<Self> {
 50        let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
 51
 52        let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
 53            let buffer = editor.buffer().clone();
 54            let buffer_snapshot = buffer.read(cx).snapshot(cx);
 55            let end_point = code_range.end.to_point(&buffer_snapshot);
 56            let next_row_start = end_point + Point::new(1, 0);
 57            if next_row_start > buffer_snapshot.max_point() {
 58                buffer.update(cx, |buffer, cx| {
 59                    buffer.edit(
 60                        [(
 61                            buffer_snapshot.max_point()..buffer_snapshot.max_point(),
 62                            "\n",
 63                        )],
 64                        None,
 65                        cx,
 66                    )
 67                });
 68            }
 69
 70            let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
 71            let block = BlockProperties {
 72                position: code_range.end,
 73                height: execution_view.num_lines(cx).saturating_add(1),
 74                style: BlockStyle::Sticky,
 75                render: Self::create_output_area_render(execution_view.clone()),
 76                disposition: BlockDisposition::Below,
 77            };
 78
 79            let block_id = editor.insert_blocks([block], None, cx)[0];
 80            (block_id, invalidation_anchor)
 81        })?;
 82
 83        anyhow::Ok(Self {
 84            editor,
 85            code_range,
 86            invalidation_anchor,
 87            block_id,
 88            execution_view,
 89        })
 90    }
 91
 92    fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
 93        self.execution_view.update(cx, |execution_view, cx| {
 94            execution_view.push_message(&message.content, cx);
 95        });
 96
 97        self.editor
 98            .update(cx, |editor, cx| {
 99                let mut replacements = HashMap::default();
100                replacements.insert(
101                    self.block_id,
102                    (
103                        Some(self.execution_view.num_lines(cx).saturating_add(1)),
104                        Self::create_output_area_render(self.execution_view.clone()),
105                    ),
106                );
107                editor.replace_blocks(replacements, None, cx);
108            })
109            .ok();
110    }
111
112    fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
113        let render = move |cx: &mut BlockContext| {
114            let execution_view = execution_view.clone();
115            let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
116            let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
117            // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
118
119            let gutter_width = cx.gutter_dimensions.width;
120
121            h_flex()
122                .w_full()
123                .bg(cx.theme().colors().background)
124                .border_y_1()
125                .border_color(cx.theme().colors().border)
126                .pl(gutter_width)
127                .child(
128                    div()
129                        .text_size(text_font_size)
130                        .font_family(text_font)
131                        // .ml(gutter_width)
132                        .mx_1()
133                        .my_2()
134                        .h_full()
135                        .w_full()
136                        .mr(gutter_width)
137                        .child(execution_view),
138                )
139                .into_any_element()
140        };
141
142        Box::new(render)
143    }
144}
145
146impl Session {
147    pub fn new(
148        editor: WeakView<Editor>,
149        fs: Arc<dyn Fs>,
150        kernel_specification: KernelSpecification,
151        cx: &mut ViewContext<Self>,
152    ) -> Self {
153        let entity_id = editor.entity_id();
154        let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
155
156        let pending_kernel = cx
157            .spawn(|this, mut cx| async move {
158                let kernel = kernel.await;
159
160                match kernel {
161                    Ok((kernel, mut messages_rx)) => {
162                        this.update(&mut cx, |this, cx| {
163                            // At this point we can create a new kind of kernel that has the process and our long running background tasks
164                            this.kernel = Kernel::RunningKernel(kernel);
165
166                            this.messaging_task = cx.spawn(|session, mut cx| async move {
167                                while let Some(message) = messages_rx.next().await {
168                                    session
169                                        .update(&mut cx, |session, cx| {
170                                            session.route(&message, cx);
171                                        })
172                                        .ok();
173                                }
174                            });
175                        })
176                        .ok();
177                    }
178                    Err(err) => {
179                        this.update(&mut cx, |this, _cx| {
180                            this.kernel = Kernel::ErroredLaunch(err.to_string());
181                        })
182                        .ok();
183                    }
184                }
185            })
186            .shared();
187
188        let subscription = match editor.upgrade() {
189            Some(editor) => {
190                let buffer = editor.read(cx).buffer().clone();
191                cx.subscribe(&buffer, Self::on_buffer_event)
192            }
193            None => Subscription::new(|| {}),
194        };
195
196        return Self {
197            editor,
198            kernel: Kernel::StartingKernel(pending_kernel),
199            messaging_task: Task::ready(()),
200            blocks: HashMap::default(),
201            kernel_specification,
202            _buffer_subscription: subscription,
203        };
204    }
205
206    fn on_buffer_event(
207        &mut self,
208        buffer: Model<MultiBuffer>,
209        event: &multi_buffer::Event,
210        cx: &mut ViewContext<Self>,
211    ) {
212        if let multi_buffer::Event::Edited { .. } = event {
213            let snapshot = buffer.read(cx).snapshot(cx);
214
215            let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
216
217            self.blocks.retain(|_id, block| {
218                if block.invalidation_anchor.is_valid(&snapshot) {
219                    true
220                } else {
221                    blocks_to_remove.insert(block.block_id);
222                    false
223                }
224            });
225
226            if !blocks_to_remove.is_empty() {
227                self.editor
228                    .update(cx, |editor, cx| {
229                        editor.remove_blocks(blocks_to_remove, None, cx);
230                    })
231                    .ok();
232                cx.notify();
233            }
234        }
235    }
236
237    fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
238        match &mut self.kernel {
239            Kernel::RunningKernel(kernel) => {
240                kernel.request_tx.try_send(message).ok();
241            }
242            _ => {}
243        }
244
245        anyhow::Ok(())
246    }
247
248    pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
249        let blocks_to_remove: HashSet<BlockId> =
250            self.blocks.values().map(|block| block.block_id).collect();
251
252        self.editor
253            .update(cx, |editor, cx| {
254                editor.remove_blocks(blocks_to_remove, None, cx);
255            })
256            .ok();
257
258        self.blocks.clear();
259    }
260
261    pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
262        let editor = if let Some(editor) = self.editor.upgrade() {
263            editor
264        } else {
265            return;
266        };
267
268        if code.is_empty() {
269            return;
270        }
271
272        let execute_request = ExecuteRequest {
273            code: code.to_string(),
274            ..ExecuteRequest::default()
275        };
276
277        let message: JupyterMessage = execute_request.into();
278
279        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
280
281        let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
282
283        self.blocks.retain(|_key, block| {
284            if anchor_range.overlaps(&block.code_range, &buffer) {
285                blocks_to_remove.insert(block.block_id);
286                false
287            } else {
288                true
289            }
290        });
291
292        self.editor
293            .update(cx, |editor, cx| {
294                editor.remove_blocks(blocks_to_remove, None, cx);
295            })
296            .ok();
297
298        let status = match &self.kernel {
299            Kernel::RunningKernel(_) => ExecutionStatus::Queued,
300            Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
301            Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
302            Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
303            Kernel::Shutdown => ExecutionStatus::Shutdown,
304        };
305
306        let editor_block = if let Ok(editor_block) =
307            EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
308        {
309            editor_block
310        } else {
311            return;
312        };
313
314        self.blocks
315            .insert(message.header.msg_id.clone(), editor_block);
316
317        match &self.kernel {
318            Kernel::RunningKernel(_) => {
319                self.send(message, cx).ok();
320            }
321            Kernel::StartingKernel(task) => {
322                // Queue up the execution as a task to run after the kernel starts
323                let task = task.clone();
324                let message = message.clone();
325
326                cx.spawn(|this, mut cx| async move {
327                    task.await;
328                    this.update(&mut cx, |this, cx| {
329                        this.send(message, cx).ok();
330                    })
331                    .ok();
332                })
333                .detach();
334            }
335            _ => {}
336        }
337    }
338
339    fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
340        let parent_message_id = match message.parent_header.as_ref() {
341            Some(header) => &header.msg_id,
342            None => return,
343        };
344
345        match &message.content {
346            JupyterMessageContent::Status(status) => {
347                self.kernel.set_execution_state(&status.execution_state);
348                cx.notify();
349            }
350            JupyterMessageContent::KernelInfoReply(reply) => {
351                self.kernel.set_kernel_info(&reply);
352                cx.notify();
353            }
354            _ => {}
355        }
356
357        if let Some(block) = self.blocks.get_mut(parent_message_id) {
358            block.handle_message(&message, cx);
359            return;
360        }
361    }
362
363    pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
364        match &mut self.kernel {
365            Kernel::RunningKernel(_kernel) => {
366                self.send(InterruptRequest {}.into(), cx).ok();
367            }
368            Kernel::StartingKernel(_task) => {
369                // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
370            }
371            _ => {}
372        }
373    }
374
375    pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
376        let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
377
378        match kernel {
379            Kernel::RunningKernel(mut kernel) => {
380                let mut request_tx = kernel.request_tx.clone();
381
382                cx.spawn(|this, mut cx| async move {
383                    let message: JupyterMessage = ShutdownRequest { restart: false }.into();
384                    request_tx.try_send(message).ok();
385
386                    // Give the kernel a bit of time to clean up
387                    cx.background_executor().timer(Duration::from_secs(3)).await;
388
389                    kernel.process.kill().ok();
390
391                    this.update(&mut cx, |this, cx| {
392                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
393                        this.clear_outputs(cx);
394                        this.kernel = Kernel::Shutdown;
395                        cx.notify();
396                    })
397                    .ok();
398                })
399                .detach();
400            }
401            Kernel::StartingKernel(_kernel) => {
402                self.kernel = Kernel::Shutdown;
403            }
404            _ => {
405                self.kernel = Kernel::Shutdown;
406            }
407        }
408        cx.notify();
409    }
410}
411
412pub enum SessionEvent {
413    Shutdown(WeakView<Editor>),
414}
415
416impl EventEmitter<SessionEvent> for Session {}
417
418impl Render for Session {
419    fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
420        let mut buttons = vec![];
421
422        buttons.push(
423            ButtonLike::new("shutdown")
424                .child(Label::new("Shutdown"))
425                .style(ButtonStyle::Subtle)
426                .on_click(cx.listener(move |session, _, cx| {
427                    session.shutdown(cx);
428                })),
429        );
430
431        let status_text = match &self.kernel {
432            Kernel::RunningKernel(kernel) => {
433                buttons.push(
434                    ButtonLike::new("interrupt")
435                        .child(Label::new("Interrupt"))
436                        .style(ButtonStyle::Subtle)
437                        .on_click(cx.listener(move |session, _, cx| {
438                            session.interrupt(cx);
439                        })),
440                );
441                let mut name = self.kernel_specification.name.clone();
442
443                if let Some(info) = &kernel.kernel_info {
444                    name.push_str(" (");
445                    name.push_str(&info.language_info.name);
446                    name.push_str(")");
447                }
448                name
449            }
450            Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
451            Kernel::ErroredLaunch(err) => {
452                format!("{} (Error: {})", self.kernel_specification.name, err)
453            }
454            Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
455            Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
456        };
457
458        return v_flex()
459            .gap_1()
460            .child(
461                h_flex()
462                    .gap_2()
463                    .child(self.kernel.dot())
464                    .child(Label::new(status_text)),
465            )
466            .child(h_flex().gap_2().children(buttons));
467    }
468}