repl.rs

  1use anyhow::{anyhow, Context as _, Result};
  2use async_dispatcher::{set_dispatcher, timeout, Dispatcher, Runnable};
  3use collections::{HashMap, HashSet};
  4use editor::{
  5    display_map::{
  6        BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
  7    },
  8    Anchor, AnchorRangeExt, Editor,
  9};
 10use futures::{
 11    channel::mpsc::{self, UnboundedSender},
 12    future::Shared,
 13    Future, FutureExt, SinkExt as _, StreamExt,
 14};
 15use gpui::prelude::*;
 16use gpui::{
 17    actions, AppContext, Context, EntityId, Global, Model, ModelContext, PlatformDispatcher, Task,
 18    WeakView,
 19};
 20use gpui::{Entity, View};
 21use language::Point;
 22use outputs::{ExecutionStatus, ExecutionView, LineHeight as _};
 23use project::Fs;
 24use runtime_settings::JupyterSettings;
 25use runtimelib::JupyterMessageContent;
 26use settings::{Settings as _, SettingsStore};
 27use std::{ops::Range, time::Instant};
 28use std::{sync::Arc, time::Duration};
 29use theme::{ActiveTheme, ThemeSettings};
 30use ui::prelude::*;
 31use workspace::Workspace;
 32
 33mod outputs;
 34// mod runtime_panel;
 35mod runtime_settings;
 36mod runtimes;
 37mod stdio;
 38
 39use runtimes::{get_runtime_specifications, Request, RunningKernel, RuntimeSpecification};
 40
 41actions!(repl, [Run]);
 42
 43#[derive(Clone)]
 44pub struct RuntimeManagerGlobal(Model<RuntimeManager>);
 45
 46impl Global for RuntimeManagerGlobal {}
 47
 48pub fn zed_dispatcher(cx: &mut AppContext) -> impl Dispatcher {
 49    struct ZedDispatcher {
 50        dispatcher: Arc<dyn PlatformDispatcher>,
 51    }
 52
 53    // PlatformDispatcher is _super_ close to the same interface we put in
 54    // async-dispatcher, except for the task label in dispatch. Later we should
 55    // just make that consistent so we have this dispatcher ready to go for
 56    // other crates in Zed.
 57    impl Dispatcher for ZedDispatcher {
 58        fn dispatch(&self, runnable: Runnable) {
 59            self.dispatcher.dispatch(runnable, None)
 60        }
 61
 62        fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
 63            self.dispatcher.dispatch_after(duration, runnable);
 64        }
 65    }
 66
 67    ZedDispatcher {
 68        dispatcher: cx.background_executor().dispatcher.clone(),
 69    }
 70}
 71
 72pub fn init(fs: Arc<dyn Fs>, cx: &mut AppContext) {
 73    set_dispatcher(zed_dispatcher(cx));
 74    JupyterSettings::register(cx);
 75
 76    observe_jupyter_settings_changes(fs.clone(), cx);
 77
 78    cx.observe_new_views(
 79        |workspace: &mut Workspace, _: &mut ViewContext<Workspace>| {
 80            workspace.register_action(run);
 81        },
 82    )
 83    .detach();
 84
 85    let settings = JupyterSettings::get_global(cx);
 86
 87    if !settings.enabled {
 88        return;
 89    }
 90
 91    initialize_runtime_manager(fs, cx);
 92}
 93
 94fn initialize_runtime_manager(fs: Arc<dyn Fs>, cx: &mut AppContext) {
 95    let runtime_manager = cx.new_model(|cx| RuntimeManager::new(fs.clone(), cx));
 96    RuntimeManager::set_global(runtime_manager.clone(), cx);
 97
 98    cx.spawn(|mut cx| async move {
 99        let fs = fs.clone();
100
101        let runtime_specifications = get_runtime_specifications(fs).await?;
102
103        runtime_manager.update(&mut cx, |this, _cx| {
104            this.runtime_specifications = runtime_specifications;
105        })?;
106
107        anyhow::Ok(())
108    })
109    .detach_and_log_err(cx);
110}
111
112fn observe_jupyter_settings_changes(fs: Arc<dyn Fs>, cx: &mut AppContext) {
113    cx.observe_global::<SettingsStore>(move |cx| {
114        let settings = JupyterSettings::get_global(cx);
115        if settings.enabled && RuntimeManager::global(cx).is_none() {
116            initialize_runtime_manager(fs.clone(), cx);
117        } else {
118            RuntimeManager::remove_global(cx);
119            // todo!(): Remove action from workspace(s)
120        }
121    })
122    .detach();
123}
124
125#[derive(Debug)]
126pub enum Kernel {
127    RunningKernel(RunningKernel),
128    StartingKernel(Shared<Task<()>>),
129    FailedLaunch,
130}
131
132// Per workspace
133pub struct RuntimeManager {
134    fs: Arc<dyn Fs>,
135    runtime_specifications: Vec<RuntimeSpecification>,
136
137    instances: HashMap<EntityId, Kernel>,
138    editors: HashMap<WeakView<Editor>, EditorRuntimeState>,
139    // todo!(): Next
140    // To reduce the number of open tasks and channels we have, let's feed the response
141    // messages by ID over to the paired ExecutionView
142    _execution_views_by_id: HashMap<String, View<ExecutionView>>,
143}
144
145#[derive(Debug, Clone)]
146struct EditorRuntimeState {
147    blocks: Vec<EditorRuntimeBlock>,
148    // todo!(): Store a subscription to the editor so we can drop them when the editor is dropped
149    // subscription: gpui::Subscription,
150}
151
152#[derive(Debug, Clone)]
153struct EditorRuntimeBlock {
154    code_range: Range<Anchor>,
155    _execution_id: String,
156    block_id: BlockId,
157    _execution_view: View<ExecutionView>,
158}
159
160impl RuntimeManager {
161    pub fn new(fs: Arc<dyn Fs>, _cx: &mut AppContext) -> Self {
162        Self {
163            fs,
164            runtime_specifications: Default::default(),
165            instances: Default::default(),
166            editors: Default::default(),
167            _execution_views_by_id: Default::default(),
168        }
169    }
170
171    fn get_or_launch_kernel(
172        &mut self,
173        entity_id: EntityId,
174        language_name: Arc<str>,
175        cx: &mut ModelContext<Self>,
176    ) -> Task<Result<UnboundedSender<Request>>> {
177        let kernel = self.instances.get(&entity_id);
178        let pending_kernel_start = match kernel {
179            Some(Kernel::RunningKernel(running_kernel)) => {
180                return Task::ready(anyhow::Ok(running_kernel.request_tx.clone()));
181            }
182            Some(Kernel::StartingKernel(task)) => task.clone(),
183            Some(Kernel::FailedLaunch) | None => {
184                self.instances.remove(&entity_id);
185
186                let kernel = self.launch_kernel(entity_id, language_name, cx);
187                let pending_kernel = cx
188                    .spawn(|this, mut cx| async move {
189                        let running_kernel = kernel.await;
190
191                        match running_kernel {
192                            Ok(running_kernel) => {
193                                let _ = this.update(&mut cx, |this, _cx| {
194                                    this.instances
195                                        .insert(entity_id, Kernel::RunningKernel(running_kernel));
196                                });
197                            }
198                            Err(_err) => {
199                                let _ = this.update(&mut cx, |this, _cx| {
200                                    this.instances.insert(entity_id, Kernel::FailedLaunch);
201                                });
202                            }
203                        }
204                    })
205                    .shared();
206
207                self.instances
208                    .insert(entity_id, Kernel::StartingKernel(pending_kernel.clone()));
209
210                pending_kernel
211            }
212        };
213
214        cx.spawn(|this, mut cx| async move {
215            pending_kernel_start.await;
216
217            this.update(&mut cx, |this, _cx| {
218                let kernel = this
219                    .instances
220                    .get(&entity_id)
221                    .ok_or(anyhow!("unable to get a running kernel"))?;
222
223                match kernel {
224                    Kernel::RunningKernel(running_kernel) => Ok(running_kernel.request_tx.clone()),
225                    _ => Err(anyhow!("unable to get a running kernel")),
226                }
227            })?
228        })
229    }
230
231    fn launch_kernel(
232        &mut self,
233        entity_id: EntityId,
234        language_name: Arc<str>,
235        cx: &mut ModelContext<Self>,
236    ) -> Task<Result<RunningKernel>> {
237        // Get first runtime that matches the language name (for now)
238        let runtime_specification =
239            self.runtime_specifications
240                .iter()
241                .find(|runtime_specification| {
242                    runtime_specification.kernelspec.language == language_name.to_string()
243                });
244
245        let runtime_specification = match runtime_specification {
246            Some(runtime_specification) => runtime_specification,
247            None => {
248                return Task::ready(Err(anyhow::anyhow!(
249                    "No runtime found for language {}",
250                    language_name
251                )));
252            }
253        };
254
255        let runtime_specification = runtime_specification.clone();
256
257        let fs = self.fs.clone();
258
259        cx.spawn(|_, cx| async move {
260            let running_kernel =
261                RunningKernel::new(runtime_specification, entity_id, fs.clone(), cx);
262
263            let running_kernel = running_kernel.await?;
264
265            let mut request_tx = running_kernel.request_tx.clone();
266
267            let overall_timeout_duration = Duration::from_secs(10);
268
269            let start_time = Instant::now();
270
271            loop {
272                if start_time.elapsed() > overall_timeout_duration {
273                    // todo!(): Kill the kernel
274                    return Err(anyhow::anyhow!("Kernel did not respond in time"));
275                }
276
277                let (tx, rx) = mpsc::unbounded();
278                match request_tx
279                    .send(Request {
280                        request: runtimelib::KernelInfoRequest {}.into(),
281                        responses_rx: tx,
282                    })
283                    .await
284                {
285                    Ok(_) => {}
286                    Err(_err) => {
287                        break;
288                    }
289                };
290
291                let mut rx = rx.fuse();
292
293                let kernel_info_timeout = Duration::from_secs(1);
294
295                let mut got_kernel_info = false;
296                while let Ok(Some(message)) = timeout(kernel_info_timeout, rx.next()).await {
297                    match message {
298                        JupyterMessageContent::KernelInfoReply(_) => {
299                            got_kernel_info = true;
300                        }
301                        _ => {}
302                    }
303                }
304
305                if got_kernel_info {
306                    break;
307                }
308            }
309
310            anyhow::Ok(running_kernel)
311        })
312    }
313
314    fn execute_code(
315        &mut self,
316        entity_id: EntityId,
317        language_name: Arc<str>,
318        code: String,
319        cx: &mut ModelContext<Self>,
320    ) -> impl Future<Output = Result<mpsc::UnboundedReceiver<JupyterMessageContent>>> {
321        let (tx, rx) = mpsc::unbounded();
322
323        let request_tx = self.get_or_launch_kernel(entity_id, language_name, cx);
324
325        async move {
326            let request_tx = request_tx.await?;
327
328            request_tx
329                .unbounded_send(Request {
330                    request: runtimelib::ExecuteRequest {
331                        code,
332                        allow_stdin: false,
333                        silent: false,
334                        store_history: true,
335                        stop_on_error: true,
336                        ..Default::default()
337                    }
338                    .into(),
339                    responses_rx: tx,
340                })
341                .context("Failed to send execution request")?;
342
343            Ok(rx)
344        }
345    }
346
347    pub fn global(cx: &AppContext) -> Option<Model<Self>> {
348        cx.try_global::<RuntimeManagerGlobal>()
349            .map(|runtime_manager| runtime_manager.0.clone())
350    }
351
352    pub fn set_global(runtime_manager: Model<Self>, cx: &mut AppContext) {
353        cx.set_global(RuntimeManagerGlobal(runtime_manager));
354    }
355
356    pub fn remove_global(cx: &mut AppContext) {
357        if RuntimeManager::global(cx).is_some() {
358            cx.remove_global::<RuntimeManagerGlobal>();
359        }
360    }
361}
362
363pub fn get_active_editor(
364    workspace: &mut Workspace,
365    cx: &mut ViewContext<Workspace>,
366) -> Option<View<Editor>> {
367    workspace
368        .active_item(cx)
369        .and_then(|item| item.act_as::<Editor>(cx))
370}
371
372// Gets the active selection in the editor or the current line
373pub fn selection(editor: View<Editor>, cx: &mut ViewContext<Workspace>) -> Range<Anchor> {
374    let editor = editor.read(cx);
375    let selection = editor.selections.newest::<usize>(cx);
376    let buffer = editor.buffer().read(cx).snapshot(cx);
377
378    let range = if selection.is_empty() {
379        let cursor = selection.head();
380
381        let line_start = buffer.offset_to_point(cursor).row;
382        let mut start_offset = buffer.point_to_offset(Point::new(line_start, 0));
383
384        // Iterate backwards to find the start of the line
385        while start_offset > 0 {
386            let ch = buffer.chars_at(start_offset - 1).next().unwrap_or('\0');
387            if ch == '\n' {
388                break;
389            }
390            start_offset -= 1;
391        }
392
393        let mut end_offset = cursor;
394
395        // Iterate forwards to find the end of the line
396        while end_offset < buffer.len() {
397            let ch = buffer.chars_at(end_offset).next().unwrap_or('\0');
398            if ch == '\n' {
399                break;
400            }
401            end_offset += 1;
402        }
403
404        // Create a range from the start to the end of the line
405        start_offset..end_offset
406    } else {
407        selection.range()
408    };
409
410    let anchor_range = buffer.anchor_before(range.start)..buffer.anchor_after(range.end);
411    anchor_range
412}
413
414pub fn run(workspace: &mut Workspace, _: &Run, cx: &mut ViewContext<Workspace>) {
415    let (editor, runtime_manager) = if let (Some(editor), Some(runtime_manager)) =
416        (get_active_editor(workspace, cx), RuntimeManager::global(cx))
417    {
418        (editor, runtime_manager)
419    } else {
420        log::warn!("No active editor or runtime manager found");
421        return;
422    };
423
424    let anchor_range = selection(editor.clone(), cx);
425
426    let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
427
428    let selected_text = buffer
429        .text_for_range(anchor_range.clone())
430        .collect::<String>();
431
432    let start_language = buffer.language_at(anchor_range.start);
433    let end_language = buffer.language_at(anchor_range.end);
434
435    let language_name = if start_language == end_language {
436        start_language
437            .map(|language| language.code_fence_block_name())
438            .filter(|lang| **lang != *"markdown")
439    } else {
440        // If the selection spans multiple languages, don't run it
441        return;
442    };
443
444    let language_name = if let Some(language_name) = language_name {
445        language_name
446    } else {
447        return;
448    };
449
450    let entity_id = editor.entity_id();
451
452    let execution_view = cx.new_view(|cx| ExecutionView::new(cx));
453
454    // If any block overlaps with the new block, remove it
455    // TODO: When inserting a new block, put it in order so that search is efficient
456    let blocks_to_remove = runtime_manager.update(cx, |runtime_manager, _cx| {
457        // Get the current `EditorRuntimeState` for this runtime_manager, inserting it if it doesn't exist
458        let editor_runtime_state = runtime_manager
459            .editors
460            .entry(editor.downgrade())
461            .or_insert_with(|| EditorRuntimeState { blocks: Vec::new() });
462
463        let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
464
465        editor_runtime_state.blocks.retain(|block| {
466            if anchor_range.overlaps(&block.code_range, &buffer) {
467                blocks_to_remove.insert(block.block_id);
468                // Drop this block
469                false
470            } else {
471                true
472            }
473        });
474
475        blocks_to_remove
476    });
477
478    let blocks_to_remove = blocks_to_remove.clone();
479
480    let block_id = editor.update(cx, |editor, cx| {
481        editor.remove_blocks(blocks_to_remove, None, cx);
482        let block = BlockProperties {
483            position: anchor_range.end,
484            height: execution_view.num_lines(cx).saturating_add(1),
485            style: BlockStyle::Sticky,
486            render: create_output_area_render(execution_view.clone()),
487            disposition: BlockDisposition::Below,
488        };
489
490        editor.insert_blocks([block], None, cx)[0]
491    });
492
493    let receiver = runtime_manager.update(cx, |runtime_manager, cx| {
494        let editor_runtime_state = runtime_manager
495            .editors
496            .entry(editor.downgrade())
497            .or_insert_with(|| EditorRuntimeState { blocks: Vec::new() });
498
499        let editor_runtime_block = EditorRuntimeBlock {
500            code_range: anchor_range.clone(),
501            block_id,
502            _execution_view: execution_view.clone(),
503            _execution_id: Default::default(),
504        };
505
506        editor_runtime_state
507            .blocks
508            .push(editor_runtime_block.clone());
509
510        runtime_manager.execute_code(entity_id, language_name, selected_text.clone(), cx)
511    });
512
513    cx.spawn(|_this, mut cx| async move {
514        execution_view.update(&mut cx, |execution_view, cx| {
515            execution_view.set_status(ExecutionStatus::ConnectingToKernel, cx);
516        })?;
517        let mut receiver = receiver.await?;
518
519        let execution_view = execution_view.clone();
520        while let Some(content) = receiver.next().await {
521            execution_view.update(&mut cx, |execution_view, cx| {
522                execution_view.push_message(&content, cx)
523            })?;
524
525            editor.update(&mut cx, |editor, cx| {
526                let mut replacements = HashMap::default();
527                replacements.insert(
528                    block_id,
529                    (
530                        Some(execution_view.num_lines(cx).saturating_add(1)),
531                        create_output_area_render(execution_view.clone()),
532                    ),
533                );
534                editor.replace_blocks(replacements, None, cx);
535            })?;
536        }
537        anyhow::Ok(())
538    })
539    .detach_and_log_err(cx);
540}
541
542fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
543    let render = move |cx: &mut BlockContext| {
544        let execution_view = execution_view.clone();
545        let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
546        // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
547
548        let gutter_width = cx.gutter_dimensions.width;
549
550        h_flex()
551            .w_full()
552            .bg(cx.theme().colors().background)
553            .border_y_1()
554            .border_color(cx.theme().colors().border)
555            .pl(gutter_width)
556            .child(
557                div()
558                    .font_family(text_font)
559                    // .ml(gutter_width)
560                    .mx_1()
561                    .my_2()
562                    .h_full()
563                    .w_full()
564                    .mr(gutter_width)
565                    .child(execution_view),
566            )
567            .into_any_element()
568    };
569
570    Box::new(render)
571}