1use anyhow::{anyhow, Context as _, Result};
2use async_dispatcher::{set_dispatcher, timeout, Dispatcher, Runnable};
3use collections::{HashMap, HashSet};
4use editor::{
5 display_map::{
6 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
7 },
8 Anchor, AnchorRangeExt, Editor,
9};
10use futures::{
11 channel::mpsc::{self, UnboundedSender},
12 future::Shared,
13 Future, FutureExt, SinkExt as _, StreamExt,
14};
15use gpui::prelude::*;
16use gpui::{
17 actions, AppContext, Context, EntityId, Global, Model, ModelContext, PlatformDispatcher, Task,
18 WeakView,
19};
20use gpui::{Entity, View};
21use language::Point;
22use outputs::{ExecutionStatus, ExecutionView, LineHeight as _};
23use project::Fs;
24use runtime_settings::JupyterSettings;
25use runtimelib::JupyterMessageContent;
26use settings::{Settings as _, SettingsStore};
27use std::{ops::Range, time::Instant};
28use std::{sync::Arc, time::Duration};
29use theme::{ActiveTheme, ThemeSettings};
30use ui::prelude::*;
31use workspace::Workspace;
32
33mod outputs;
34// mod runtime_panel;
35mod runtime_settings;
36mod runtimes;
37mod stdio;
38
39use runtimes::{get_runtime_specifications, Request, RunningKernel, RuntimeSpecification};
40
41actions!(repl, [Run]);
42
43#[derive(Clone)]
44pub struct RuntimeManagerGlobal(Model<RuntimeManager>);
45
46impl Global for RuntimeManagerGlobal {}
47
48pub fn zed_dispatcher(cx: &mut AppContext) -> impl Dispatcher {
49 struct ZedDispatcher {
50 dispatcher: Arc<dyn PlatformDispatcher>,
51 }
52
53 // PlatformDispatcher is _super_ close to the same interface we put in
54 // async-dispatcher, except for the task label in dispatch. Later we should
55 // just make that consistent so we have this dispatcher ready to go for
56 // other crates in Zed.
57 impl Dispatcher for ZedDispatcher {
58 fn dispatch(&self, runnable: Runnable) {
59 self.dispatcher.dispatch(runnable, None)
60 }
61
62 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
63 self.dispatcher.dispatch_after(duration, runnable);
64 }
65 }
66
67 ZedDispatcher {
68 dispatcher: cx.background_executor().dispatcher.clone(),
69 }
70}
71
72pub fn init(fs: Arc<dyn Fs>, cx: &mut AppContext) {
73 set_dispatcher(zed_dispatcher(cx));
74 JupyterSettings::register(cx);
75
76 observe_jupyter_settings_changes(fs.clone(), cx);
77
78 cx.observe_new_views(
79 |workspace: &mut Workspace, _: &mut ViewContext<Workspace>| {
80 workspace.register_action(run);
81 },
82 )
83 .detach();
84
85 let settings = JupyterSettings::get_global(cx);
86
87 if !settings.enabled {
88 return;
89 }
90
91 initialize_runtime_manager(fs, cx);
92}
93
94fn initialize_runtime_manager(fs: Arc<dyn Fs>, cx: &mut AppContext) {
95 let runtime_manager = cx.new_model(|cx| RuntimeManager::new(fs.clone(), cx));
96 RuntimeManager::set_global(runtime_manager.clone(), cx);
97
98 cx.spawn(|mut cx| async move {
99 let fs = fs.clone();
100
101 let runtime_specifications = get_runtime_specifications(fs).await?;
102
103 runtime_manager.update(&mut cx, |this, _cx| {
104 this.runtime_specifications = runtime_specifications;
105 })?;
106
107 anyhow::Ok(())
108 })
109 .detach_and_log_err(cx);
110}
111
112fn observe_jupyter_settings_changes(fs: Arc<dyn Fs>, cx: &mut AppContext) {
113 cx.observe_global::<SettingsStore>(move |cx| {
114 let settings = JupyterSettings::get_global(cx);
115 if settings.enabled && RuntimeManager::global(cx).is_none() {
116 initialize_runtime_manager(fs.clone(), cx);
117 } else {
118 RuntimeManager::remove_global(cx);
119 // todo!(): Remove action from workspace(s)
120 }
121 })
122 .detach();
123}
124
125#[derive(Debug)]
126pub enum Kernel {
127 RunningKernel(RunningKernel),
128 StartingKernel(Shared<Task<()>>),
129 FailedLaunch,
130}
131
132// Per workspace
133pub struct RuntimeManager {
134 fs: Arc<dyn Fs>,
135 runtime_specifications: Vec<RuntimeSpecification>,
136
137 instances: HashMap<EntityId, Kernel>,
138 editors: HashMap<WeakView<Editor>, EditorRuntimeState>,
139 // todo!(): Next
140 // To reduce the number of open tasks and channels we have, let's feed the response
141 // messages by ID over to the paired ExecutionView
142 _execution_views_by_id: HashMap<String, View<ExecutionView>>,
143}
144
145#[derive(Debug, Clone)]
146struct EditorRuntimeState {
147 blocks: Vec<EditorRuntimeBlock>,
148 // todo!(): Store a subscription to the editor so we can drop them when the editor is dropped
149 // subscription: gpui::Subscription,
150}
151
152#[derive(Debug, Clone)]
153struct EditorRuntimeBlock {
154 code_range: Range<Anchor>,
155 _execution_id: String,
156 block_id: BlockId,
157 _execution_view: View<ExecutionView>,
158}
159
160impl RuntimeManager {
161 pub fn new(fs: Arc<dyn Fs>, _cx: &mut AppContext) -> Self {
162 Self {
163 fs,
164 runtime_specifications: Default::default(),
165 instances: Default::default(),
166 editors: Default::default(),
167 _execution_views_by_id: Default::default(),
168 }
169 }
170
171 fn get_or_launch_kernel(
172 &mut self,
173 entity_id: EntityId,
174 language_name: Arc<str>,
175 cx: &mut ModelContext<Self>,
176 ) -> Task<Result<UnboundedSender<Request>>> {
177 let kernel = self.instances.get(&entity_id);
178 let pending_kernel_start = match kernel {
179 Some(Kernel::RunningKernel(running_kernel)) => {
180 return Task::ready(anyhow::Ok(running_kernel.request_tx.clone()));
181 }
182 Some(Kernel::StartingKernel(task)) => task.clone(),
183 Some(Kernel::FailedLaunch) | None => {
184 self.instances.remove(&entity_id);
185
186 let kernel = self.launch_kernel(entity_id, language_name, cx);
187 let pending_kernel = cx
188 .spawn(|this, mut cx| async move {
189 let running_kernel = kernel.await;
190
191 match running_kernel {
192 Ok(running_kernel) => {
193 let _ = this.update(&mut cx, |this, _cx| {
194 this.instances
195 .insert(entity_id, Kernel::RunningKernel(running_kernel));
196 });
197 }
198 Err(_err) => {
199 let _ = this.update(&mut cx, |this, _cx| {
200 this.instances.insert(entity_id, Kernel::FailedLaunch);
201 });
202 }
203 }
204 })
205 .shared();
206
207 self.instances
208 .insert(entity_id, Kernel::StartingKernel(pending_kernel.clone()));
209
210 pending_kernel
211 }
212 };
213
214 cx.spawn(|this, mut cx| async move {
215 pending_kernel_start.await;
216
217 this.update(&mut cx, |this, _cx| {
218 let kernel = this
219 .instances
220 .get(&entity_id)
221 .ok_or(anyhow!("unable to get a running kernel"))?;
222
223 match kernel {
224 Kernel::RunningKernel(running_kernel) => Ok(running_kernel.request_tx.clone()),
225 _ => Err(anyhow!("unable to get a running kernel")),
226 }
227 })?
228 })
229 }
230
231 fn launch_kernel(
232 &mut self,
233 entity_id: EntityId,
234 language_name: Arc<str>,
235 cx: &mut ModelContext<Self>,
236 ) -> Task<Result<RunningKernel>> {
237 // Get first runtime that matches the language name (for now)
238 let runtime_specification =
239 self.runtime_specifications
240 .iter()
241 .find(|runtime_specification| {
242 runtime_specification.kernelspec.language == language_name.to_string()
243 });
244
245 let runtime_specification = match runtime_specification {
246 Some(runtime_specification) => runtime_specification,
247 None => {
248 return Task::ready(Err(anyhow::anyhow!(
249 "No runtime found for language {}",
250 language_name
251 )));
252 }
253 };
254
255 let runtime_specification = runtime_specification.clone();
256
257 let fs = self.fs.clone();
258
259 cx.spawn(|_, cx| async move {
260 let running_kernel =
261 RunningKernel::new(runtime_specification, entity_id, fs.clone(), cx);
262
263 let running_kernel = running_kernel.await?;
264
265 let mut request_tx = running_kernel.request_tx.clone();
266
267 let overall_timeout_duration = Duration::from_secs(10);
268
269 let start_time = Instant::now();
270
271 loop {
272 if start_time.elapsed() > overall_timeout_duration {
273 // todo!(): Kill the kernel
274 return Err(anyhow::anyhow!("Kernel did not respond in time"));
275 }
276
277 let (tx, rx) = mpsc::unbounded();
278 match request_tx
279 .send(Request {
280 request: runtimelib::KernelInfoRequest {}.into(),
281 responses_rx: tx,
282 })
283 .await
284 {
285 Ok(_) => {}
286 Err(_err) => {
287 break;
288 }
289 };
290
291 let mut rx = rx.fuse();
292
293 let kernel_info_timeout = Duration::from_secs(1);
294
295 let mut got_kernel_info = false;
296 while let Ok(Some(message)) = timeout(kernel_info_timeout, rx.next()).await {
297 match message {
298 JupyterMessageContent::KernelInfoReply(_) => {
299 got_kernel_info = true;
300 }
301 _ => {}
302 }
303 }
304
305 if got_kernel_info {
306 break;
307 }
308 }
309
310 anyhow::Ok(running_kernel)
311 })
312 }
313
314 fn execute_code(
315 &mut self,
316 entity_id: EntityId,
317 language_name: Arc<str>,
318 code: String,
319 cx: &mut ModelContext<Self>,
320 ) -> impl Future<Output = Result<mpsc::UnboundedReceiver<JupyterMessageContent>>> {
321 let (tx, rx) = mpsc::unbounded();
322
323 let request_tx = self.get_or_launch_kernel(entity_id, language_name, cx);
324
325 async move {
326 let request_tx = request_tx.await?;
327
328 request_tx
329 .unbounded_send(Request {
330 request: runtimelib::ExecuteRequest {
331 code,
332 allow_stdin: false,
333 silent: false,
334 store_history: true,
335 stop_on_error: true,
336 ..Default::default()
337 }
338 .into(),
339 responses_rx: tx,
340 })
341 .context("Failed to send execution request")?;
342
343 Ok(rx)
344 }
345 }
346
347 pub fn global(cx: &AppContext) -> Option<Model<Self>> {
348 cx.try_global::<RuntimeManagerGlobal>()
349 .map(|runtime_manager| runtime_manager.0.clone())
350 }
351
352 pub fn set_global(runtime_manager: Model<Self>, cx: &mut AppContext) {
353 cx.set_global(RuntimeManagerGlobal(runtime_manager));
354 }
355
356 pub fn remove_global(cx: &mut AppContext) {
357 if RuntimeManager::global(cx).is_some() {
358 cx.remove_global::<RuntimeManagerGlobal>();
359 }
360 }
361}
362
363pub fn get_active_editor(
364 workspace: &mut Workspace,
365 cx: &mut ViewContext<Workspace>,
366) -> Option<View<Editor>> {
367 workspace
368 .active_item(cx)
369 .and_then(|item| item.act_as::<Editor>(cx))
370}
371
372// Gets the active selection in the editor or the current line
373pub fn selection(editor: View<Editor>, cx: &mut ViewContext<Workspace>) -> Range<Anchor> {
374 let editor = editor.read(cx);
375 let selection = editor.selections.newest::<usize>(cx);
376 let buffer = editor.buffer().read(cx).snapshot(cx);
377
378 let range = if selection.is_empty() {
379 let cursor = selection.head();
380
381 let line_start = buffer.offset_to_point(cursor).row;
382 let mut start_offset = buffer.point_to_offset(Point::new(line_start, 0));
383
384 // Iterate backwards to find the start of the line
385 while start_offset > 0 {
386 let ch = buffer.chars_at(start_offset - 1).next().unwrap_or('\0');
387 if ch == '\n' {
388 break;
389 }
390 start_offset -= 1;
391 }
392
393 let mut end_offset = cursor;
394
395 // Iterate forwards to find the end of the line
396 while end_offset < buffer.len() {
397 let ch = buffer.chars_at(end_offset).next().unwrap_or('\0');
398 if ch == '\n' {
399 break;
400 }
401 end_offset += 1;
402 }
403
404 // Create a range from the start to the end of the line
405 start_offset..end_offset
406 } else {
407 selection.range()
408 };
409
410 let anchor_range = buffer.anchor_before(range.start)..buffer.anchor_after(range.end);
411 anchor_range
412}
413
414pub fn run(workspace: &mut Workspace, _: &Run, cx: &mut ViewContext<Workspace>) {
415 let (editor, runtime_manager) = if let (Some(editor), Some(runtime_manager)) =
416 (get_active_editor(workspace, cx), RuntimeManager::global(cx))
417 {
418 (editor, runtime_manager)
419 } else {
420 log::warn!("No active editor or runtime manager found");
421 return;
422 };
423
424 let anchor_range = selection(editor.clone(), cx);
425
426 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
427
428 let selected_text = buffer
429 .text_for_range(anchor_range.clone())
430 .collect::<String>();
431
432 let start_language = buffer.language_at(anchor_range.start);
433 let end_language = buffer.language_at(anchor_range.end);
434
435 let language_name = if start_language == end_language {
436 start_language
437 .map(|language| language.code_fence_block_name())
438 .filter(|lang| **lang != *"markdown")
439 } else {
440 // If the selection spans multiple languages, don't run it
441 return;
442 };
443
444 let language_name = if let Some(language_name) = language_name {
445 language_name
446 } else {
447 return;
448 };
449
450 let entity_id = editor.entity_id();
451
452 let execution_view = cx.new_view(|cx| ExecutionView::new(cx));
453
454 // If any block overlaps with the new block, remove it
455 // TODO: When inserting a new block, put it in order so that search is efficient
456 let blocks_to_remove = runtime_manager.update(cx, |runtime_manager, _cx| {
457 // Get the current `EditorRuntimeState` for this runtime_manager, inserting it if it doesn't exist
458 let editor_runtime_state = runtime_manager
459 .editors
460 .entry(editor.downgrade())
461 .or_insert_with(|| EditorRuntimeState { blocks: Vec::new() });
462
463 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
464
465 editor_runtime_state.blocks.retain(|block| {
466 if anchor_range.overlaps(&block.code_range, &buffer) {
467 blocks_to_remove.insert(block.block_id);
468 // Drop this block
469 false
470 } else {
471 true
472 }
473 });
474
475 blocks_to_remove
476 });
477
478 let blocks_to_remove = blocks_to_remove.clone();
479
480 let block_id = editor.update(cx, |editor, cx| {
481 editor.remove_blocks(blocks_to_remove, None, cx);
482 let block = BlockProperties {
483 position: anchor_range.end,
484 height: execution_view.num_lines(cx).saturating_add(1),
485 style: BlockStyle::Sticky,
486 render: create_output_area_render(execution_view.clone()),
487 disposition: BlockDisposition::Below,
488 };
489
490 editor.insert_blocks([block], None, cx)[0]
491 });
492
493 let receiver = runtime_manager.update(cx, |runtime_manager, cx| {
494 let editor_runtime_state = runtime_manager
495 .editors
496 .entry(editor.downgrade())
497 .or_insert_with(|| EditorRuntimeState { blocks: Vec::new() });
498
499 let editor_runtime_block = EditorRuntimeBlock {
500 code_range: anchor_range.clone(),
501 block_id,
502 _execution_view: execution_view.clone(),
503 _execution_id: Default::default(),
504 };
505
506 editor_runtime_state
507 .blocks
508 .push(editor_runtime_block.clone());
509
510 runtime_manager.execute_code(entity_id, language_name, selected_text.clone(), cx)
511 });
512
513 cx.spawn(|_this, mut cx| async move {
514 execution_view.update(&mut cx, |execution_view, cx| {
515 execution_view.set_status(ExecutionStatus::ConnectingToKernel, cx);
516 })?;
517 let mut receiver = receiver.await?;
518
519 let execution_view = execution_view.clone();
520 while let Some(content) = receiver.next().await {
521 execution_view.update(&mut cx, |execution_view, cx| {
522 execution_view.push_message(&content, cx)
523 })?;
524
525 editor.update(&mut cx, |editor, cx| {
526 let mut replacements = HashMap::default();
527 replacements.insert(
528 block_id,
529 (
530 Some(execution_view.num_lines(cx).saturating_add(1)),
531 create_output_area_render(execution_view.clone()),
532 ),
533 );
534 editor.replace_blocks(replacements, None, cx);
535 })?;
536 }
537 anyhow::Ok(())
538 })
539 .detach_and_log_err(cx);
540}
541
542fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
543 let render = move |cx: &mut BlockContext| {
544 let execution_view = execution_view.clone();
545 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
546 // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
547
548 let gutter_width = cx.gutter_dimensions.width;
549
550 h_flex()
551 .w_full()
552 .bg(cx.theme().colors().background)
553 .border_y_1()
554 .border_color(cx.theme().colors().border)
555 .pl(gutter_width)
556 .child(
557 div()
558 .font_family(text_font)
559 // .ml(gutter_width)
560 .mx_1()
561 .my_2()
562 .h_full()
563 .w_full()
564 .mr(gutter_width)
565 .child(execution_view),
566 )
567 .into_any_element()
568 };
569
570 Box::new(render)
571}