1use crate::components::KernelListItem;
2use crate::kernels::RemoteRunningKernel;
3use crate::setup_editor_session_actions;
4use crate::{
5 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
6 outputs::{ExecutionStatus, ExecutionView},
7 KernelStatus,
8};
9use client::telemetry::Telemetry;
10use collections::{HashMap, HashSet};
11use editor::{
12 display_map::{
13 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
14 RenderBlock,
15 },
16 scroll::Autoscroll,
17 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
18};
19use futures::FutureExt as _;
20use gpui::{
21 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::ActiveTheme;
31use ui::{prelude::*, IconButtonShape, Tooltip};
32use util::ResultExt as _;
33
34pub struct Session {
35 fs: Arc<dyn Fs>,
36 editor: WeakView<Editor>,
37 pub kernel: Kernel,
38 blocks: HashMap<String, EditorBlock>,
39 pub kernel_specification: KernelSpecification,
40 telemetry: Arc<Telemetry>,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: View<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakView<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut ViewContext<Session>,
61 ) -> anyhow::Result<Self> {
62 let editor = editor
63 .upgrade()
64 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
65 let workspace = editor
66 .read(cx)
67 .workspace()
68 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
69
70 let execution_view =
71 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
72
73 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
74 let buffer = editor.buffer().clone();
75 let buffer_snapshot = buffer.read(cx).snapshot(cx);
76 let end_point = code_range.end.to_point(&buffer_snapshot);
77 let next_row_start = end_point + Point::new(1, 0);
78 if next_row_start > buffer_snapshot.max_point() {
79 buffer.update(cx, |buffer, cx| {
80 buffer.edit(
81 [(
82 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
83 "\n",
84 )],
85 None,
86 cx,
87 )
88 });
89 }
90
91 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
92 let block = BlockProperties {
93 placement: BlockPlacement::Below(code_range.end),
94 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
95 height: 1,
96 style: BlockStyle::Sticky,
97 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
98 priority: 0,
99 };
100
101 let block_id = editor.insert_blocks([block], None, cx)[0];
102 (block_id, invalidation_anchor)
103 });
104
105 anyhow::Ok(Self {
106 code_range,
107 invalidation_anchor,
108 block_id,
109 execution_view,
110 })
111 }
112
113 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
114 self.execution_view.update(cx, |execution_view, cx| {
115 execution_view.push_message(&message.content, cx);
116 });
117 }
118
119 fn create_output_area_renderer(
120 execution_view: View<ExecutionView>,
121 on_close: CloseBlockFn,
122 ) -> RenderBlock {
123 Arc::new(move |cx: &mut BlockContext| {
124 let execution_view = execution_view.clone();
125 let text_style = crate::outputs::plain::text_style(cx);
126
127 let gutter = cx.gutter_dimensions;
128
129 let block_id = cx.block_id;
130 let on_close = on_close.clone();
131
132 let rem_size = cx.rem_size();
133
134 let text_line_height = text_style.line_height_in_pixels(rem_size);
135
136 let close_button = h_flex()
137 .flex_none()
138 .items_center()
139 .justify_center()
140 .absolute()
141 .top(text_line_height / 2.)
142 .right(
143 // 2px is a magic number to nudge the button just a bit closer to
144 // the line number start
145 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
146 )
147 .w(text_line_height)
148 .h(text_line_height)
149 .child(
150 IconButton::new("close_output_area", IconName::Close)
151 .icon_size(IconSize::Small)
152 .icon_color(Color::Muted)
153 .size(ButtonSize::Compact)
154 .shape(IconButtonShape::Square)
155 .tooltip(|cx| Tooltip::text("Close output area", cx))
156 .on_click(move |_, cx| {
157 if let BlockId::Custom(block_id) = block_id {
158 (on_close)(block_id, cx)
159 }
160 }),
161 );
162
163 div()
164 .id(cx.block_id)
165 .block_mouse_down()
166 .flex()
167 .items_start()
168 .min_h(text_line_height)
169 .w_full()
170 .border_y_1()
171 .border_color(cx.theme().colors().border)
172 .bg(cx.theme().colors().background)
173 .child(
174 div()
175 .relative()
176 .w(gutter.full_width())
177 .h(text_line_height * 2)
178 .child(close_button),
179 )
180 .child(
181 div()
182 .flex_1()
183 .size_full()
184 .py(text_line_height / 2.)
185 .mr(gutter.width)
186 .child(execution_view),
187 )
188 .into_any_element()
189 })
190 }
191}
192
193impl Session {
194 pub fn new(
195 editor: WeakView<Editor>,
196 fs: Arc<dyn Fs>,
197 telemetry: Arc<Telemetry>,
198 kernel_specification: KernelSpecification,
199 cx: &mut ViewContext<Self>,
200 ) -> Self {
201 let subscription = match editor.upgrade() {
202 Some(editor) => {
203 let buffer = editor.read(cx).buffer().clone();
204 cx.subscribe(&buffer, Self::on_buffer_event)
205 }
206 None => Subscription::new(|| {}),
207 };
208
209 let editor_handle = editor.clone();
210
211 editor
212 .update(cx, |editor, _cx| {
213 setup_editor_session_actions(editor, editor_handle);
214 })
215 .ok();
216
217 let mut session = Self {
218 fs,
219 editor,
220 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
221 blocks: HashMap::default(),
222 kernel_specification,
223 _buffer_subscription: subscription,
224 telemetry,
225 };
226
227 session.start_kernel(cx);
228 session
229 }
230
231 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
232 let kernel_language = self.kernel_specification.language();
233 let entity_id = self.editor.entity_id();
234 let working_directory = self
235 .editor
236 .upgrade()
237 .and_then(|editor| editor.read(cx).working_directory(cx))
238 .unwrap_or_else(temp_dir);
239
240 self.telemetry.report_repl_event(
241 kernel_language.into(),
242 KernelStatus::Starting.to_string(),
243 cx.entity_id().to_string(),
244 );
245
246 let session_view = cx.view().clone();
247
248 let kernel = match self.kernel_specification.clone() {
249 KernelSpecification::Jupyter(kernel_specification)
250 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
251 kernel_specification,
252 entity_id,
253 working_directory,
254 self.fs.clone(),
255 session_view,
256 cx,
257 ),
258 KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
259 remote_kernel_specification,
260 working_directory,
261 session_view,
262 cx,
263 ),
264 };
265
266 let pending_kernel = cx
267 .spawn(|this, mut cx| async move {
268 let kernel = kernel.await;
269
270 match kernel {
271 Ok(kernel) => {
272 this.update(&mut cx, |session, cx| {
273 session.kernel(Kernel::RunningKernel(kernel), cx);
274 })
275 .ok();
276 }
277 Err(err) => {
278 this.update(&mut cx, |session, cx| {
279 session.kernel_errored(err.to_string(), cx);
280 })
281 .ok();
282 }
283 }
284 })
285 .shared();
286
287 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
288 cx.notify();
289 }
290
291 pub fn kernel_errored(&mut self, error_message: String, cx: &mut ViewContext<Self>) {
292 self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
293
294 self.blocks.values().for_each(|block| {
295 block.execution_view.update(cx, |execution_view, cx| {
296 match execution_view.status {
297 ExecutionStatus::Finished => {
298 // Do nothing when the output was good
299 }
300 _ => {
301 // All other cases, set the status to errored
302 execution_view.status =
303 ExecutionStatus::KernelErrored(error_message.clone())
304 }
305 }
306 cx.notify();
307 });
308 });
309 }
310
311 fn on_buffer_event(
312 &mut self,
313 buffer: Model<MultiBuffer>,
314 event: &multi_buffer::Event,
315 cx: &mut ViewContext<Self>,
316 ) {
317 if let multi_buffer::Event::Edited { .. } = event {
318 let snapshot = buffer.read(cx).snapshot(cx);
319
320 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
321
322 self.blocks.retain(|_id, block| {
323 if block.invalidation_anchor.is_valid(&snapshot) {
324 true
325 } else {
326 blocks_to_remove.insert(block.block_id);
327 false
328 }
329 });
330
331 if !blocks_to_remove.is_empty() {
332 self.editor
333 .update(cx, |editor, cx| {
334 editor.remove_blocks(blocks_to_remove, None, cx);
335 })
336 .ok();
337 cx.notify();
338 }
339 }
340 }
341
342 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
343 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
344 kernel.request_tx().try_send(message).ok();
345 }
346
347 anyhow::Ok(())
348 }
349
350 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
351 let blocks_to_remove: HashSet<CustomBlockId> =
352 self.blocks.values().map(|block| block.block_id).collect();
353
354 self.editor
355 .update(cx, |editor, cx| {
356 editor.remove_blocks(blocks_to_remove, None, cx);
357 })
358 .ok();
359
360 self.blocks.clear();
361 }
362
363 pub fn execute(
364 &mut self,
365 code: String,
366 anchor_range: Range<Anchor>,
367 next_cell: Option<Anchor>,
368 move_down: bool,
369 cx: &mut ViewContext<Self>,
370 ) {
371 let Some(editor) = self.editor.upgrade() else {
372 return;
373 };
374
375 if code.is_empty() {
376 return;
377 }
378
379 let execute_request = ExecuteRequest {
380 code,
381 ..ExecuteRequest::default()
382 };
383
384 let message: JupyterMessage = execute_request.into();
385
386 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
387
388 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
389
390 self.blocks.retain(|_key, block| {
391 if anchor_range.overlaps(&block.code_range, &buffer) {
392 blocks_to_remove.insert(block.block_id);
393 false
394 } else {
395 true
396 }
397 });
398
399 self.editor
400 .update(cx, |editor, cx| {
401 editor.remove_blocks(blocks_to_remove, None, cx);
402 })
403 .ok();
404
405 let status = match &self.kernel {
406 Kernel::Restarting => ExecutionStatus::Restarting,
407 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
408 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
409 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
410 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
411 Kernel::Shutdown => ExecutionStatus::Shutdown,
412 };
413
414 let parent_message_id = message.header.msg_id.clone();
415 let session_view = cx.view().downgrade();
416 let weak_editor = self.editor.clone();
417
418 let on_close: CloseBlockFn =
419 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
420 if let Some(session) = session_view.upgrade() {
421 session.update(cx, |session, cx| {
422 session.blocks.remove(&parent_message_id);
423 cx.notify();
424 });
425 }
426
427 if let Some(editor) = weak_editor.upgrade() {
428 editor.update(cx, |editor, cx| {
429 let mut block_ids = HashSet::default();
430 block_ids.insert(block_id);
431 editor.remove_blocks(block_ids, None, cx);
432 });
433 }
434 });
435
436 let Ok(editor_block) =
437 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
438 else {
439 return;
440 };
441
442 let new_cursor_pos = if let Some(next_cursor) = next_cell {
443 next_cursor
444 } else {
445 editor_block.invalidation_anchor
446 };
447
448 self.blocks
449 .insert(message.header.msg_id.clone(), editor_block);
450
451 match &self.kernel {
452 Kernel::RunningKernel(_) => {
453 self.send(message, cx).ok();
454 }
455 Kernel::StartingKernel(task) => {
456 // Queue up the execution as a task to run after the kernel starts
457 let task = task.clone();
458 let message = message.clone();
459
460 cx.spawn(|this, mut cx| async move {
461 task.await;
462 this.update(&mut cx, |session, cx| {
463 session.send(message, cx).ok();
464 })
465 .ok();
466 })
467 .detach();
468 }
469 _ => {}
470 }
471
472 if move_down {
473 editor.update(cx, move |editor, cx| {
474 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
475 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
476 });
477 });
478 }
479 }
480
481 pub fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
482 let parent_message_id = match message.parent_header.as_ref() {
483 Some(header) => &header.msg_id,
484 None => return,
485 };
486
487 match &message.content {
488 JupyterMessageContent::Status(status) => {
489 self.kernel.set_execution_state(&status.execution_state);
490
491 self.telemetry.report_repl_event(
492 self.kernel_specification.language().into(),
493 KernelStatus::from(&self.kernel).to_string(),
494 cx.entity_id().to_string(),
495 );
496
497 cx.notify();
498 }
499 JupyterMessageContent::KernelInfoReply(reply) => {
500 self.kernel.set_kernel_info(reply);
501 cx.notify();
502 }
503 JupyterMessageContent::UpdateDisplayData(update) => {
504 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
505 display_id
506 } else {
507 return;
508 };
509
510 self.blocks.iter_mut().for_each(|(_, block)| {
511 block.execution_view.update(cx, |execution_view, cx| {
512 execution_view.update_display_data(&update.data, &display_id, cx);
513 });
514 });
515 return;
516 }
517 _ => {}
518 }
519
520 if let Some(block) = self.blocks.get_mut(parent_message_id) {
521 block.handle_message(message, cx);
522 }
523 }
524
525 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
526 match &mut self.kernel {
527 Kernel::RunningKernel(_kernel) => {
528 self.send(InterruptRequest {}.into(), cx).ok();
529 }
530 Kernel::StartingKernel(_task) => {
531 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
532 }
533 _ => {}
534 }
535 }
536
537 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
538 if let Kernel::Shutdown = kernel {
539 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
540 }
541
542 let kernel_status = KernelStatus::from(&kernel).to_string();
543 let kernel_language = self.kernel_specification.language().into();
544
545 self.telemetry.report_repl_event(
546 kernel_language,
547 kernel_status,
548 cx.entity_id().to_string(),
549 );
550
551 self.kernel = kernel;
552 }
553
554 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
555 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
556
557 match kernel {
558 Kernel::RunningKernel(mut kernel) => {
559 let mut request_tx = kernel.request_tx().clone();
560
561 let forced = kernel.force_shutdown(cx);
562
563 cx.spawn(|this, mut cx| async move {
564 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
565 request_tx.try_send(message).ok();
566
567 forced.await.log_err();
568
569 // Give the kernel a bit of time to clean up
570 cx.background_executor().timer(Duration::from_secs(3)).await;
571
572 this.update(&mut cx, |session, cx| {
573 session.clear_outputs(cx);
574 session.kernel(Kernel::Shutdown, cx);
575 cx.notify();
576 })
577 .ok();
578 })
579 .detach();
580 }
581 _ => {
582 self.kernel(Kernel::Shutdown, cx);
583 }
584 }
585 cx.notify();
586 }
587
588 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
589 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
590
591 match kernel {
592 Kernel::Restarting => {
593 // Do nothing if already restarting
594 }
595 Kernel::RunningKernel(mut kernel) => {
596 let mut request_tx = kernel.request_tx().clone();
597
598 let forced = kernel.force_shutdown(cx);
599
600 cx.spawn(|this, mut cx| async move {
601 // Send shutdown request with restart flag
602 log::debug!("restarting kernel");
603 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
604 request_tx.try_send(message).ok();
605
606 // Wait for kernel to shutdown
607 cx.background_executor().timer(Duration::from_secs(1)).await;
608
609 // Force kill the kernel if it hasn't shut down
610 forced.await.log_err();
611
612 // Start a new kernel
613 this.update(&mut cx, |session, cx| {
614 // todo!(): Differentiate between restart and restart+clear-outputs
615 session.clear_outputs(cx);
616 session.start_kernel(cx);
617 })
618 .ok();
619 })
620 .detach();
621 }
622 _ => {
623 self.clear_outputs(cx);
624 self.start_kernel(cx);
625 }
626 }
627 cx.notify();
628 }
629}
630
631pub enum SessionEvent {
632 Shutdown(WeakView<Editor>),
633}
634
635impl EventEmitter<SessionEvent> for Session {}
636
637impl Render for Session {
638 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
639 let (status_text, interrupt_button) = match &self.kernel {
640 Kernel::RunningKernel(kernel) => (
641 kernel
642 .kernel_info()
643 .as_ref()
644 .map(|info| info.language_info.name.clone()),
645 Some(
646 Button::new("interrupt", "Interrupt")
647 .style(ButtonStyle::Subtle)
648 .on_click(cx.listener(move |session, _, cx| {
649 session.interrupt(cx);
650 })),
651 ),
652 ),
653 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
654 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
655 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
656 Kernel::Shutdown => (Some("Shutdown".into()), None),
657 Kernel::Restarting => (Some("Restarting".into()), None),
658 };
659
660 KernelListItem::new(self.kernel_specification.clone())
661 .status_color(match &self.kernel {
662 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
663 ExecutionState::Idle => Color::Success,
664 ExecutionState::Busy => Color::Modified,
665 },
666 Kernel::StartingKernel(_) => Color::Modified,
667 Kernel::ErroredLaunch(_) => Color::Error,
668 Kernel::ShuttingDown => Color::Modified,
669 Kernel::Shutdown => Color::Disabled,
670 Kernel::Restarting => Color::Modified,
671 })
672 .child(Label::new(self.kernel_specification.name()))
673 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
674 .button(
675 Button::new("shutdown", "Shutdown")
676 .style(ButtonStyle::Subtle)
677 .disabled(self.kernel.is_shutting_down())
678 .on_click(cx.listener(move |session, _, cx| {
679 session.shutdown(cx);
680 })),
681 )
682 .buttons(interrupt_button)
683 }
684}