1use crate::components::KernelListItem;
2use crate::kernels::RemoteRunningKernel;
3use crate::setup_editor_session_actions;
4use crate::{
5 KernelStatus,
6 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
7 outputs::{ExecutionStatus, ExecutionView},
8};
9use anyhow::Context as _;
10use collections::{HashMap, HashSet};
11use editor::SelectionEffects;
12use editor::{
13 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
14 display_map::{
15 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
16 RenderBlock,
17 },
18 scroll::Autoscroll,
19};
20use futures::FutureExt as _;
21use gpui::{
22 Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
23};
24use language::Point;
25use project::Fs;
26use runtimelib::{
27 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
28 ShutdownRequest,
29};
30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
31use theme::ActiveTheme;
32use ui::{IconButtonShape, Tooltip, prelude::*};
33use util::ResultExt as _;
34
35pub struct Session {
36 fs: Arc<dyn Fs>,
37 editor: WeakEntity<Editor>,
38 pub kernel: Kernel,
39 blocks: HashMap<String, EditorBlock>,
40 pub kernel_specification: KernelSpecification,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: Entity<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakEntity<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut Context<Session>,
61 ) -> anyhow::Result<Self> {
62 let editor = editor.upgrade().context("editor is not open")?;
63 let workspace = editor.read(cx).workspace().context("workspace dropped")?;
64
65 let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
66
67 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
68 let buffer = editor.buffer().clone();
69 let buffer_snapshot = buffer.read(cx).snapshot(cx);
70 let end_point = code_range.end.to_point(&buffer_snapshot);
71 let next_row_start = end_point + Point::new(1, 0);
72 if next_row_start > buffer_snapshot.max_point() {
73 buffer.update(cx, |buffer, cx| {
74 buffer.edit(
75 [(
76 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
77 "\n",
78 )],
79 None,
80 cx,
81 )
82 });
83 }
84
85 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
86 let block = BlockProperties {
87 placement: BlockPlacement::Below(code_range.end),
88 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
89 height: Some(1),
90 style: BlockStyle::Sticky,
91 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
92 priority: 0,
93 };
94
95 let block_id = editor.insert_blocks([block], None, cx)[0];
96 (block_id, invalidation_anchor)
97 });
98
99 anyhow::Ok(Self {
100 code_range,
101 invalidation_anchor,
102 block_id,
103 execution_view,
104 })
105 }
106
107 fn handle_message(
108 &mut self,
109 message: &JupyterMessage,
110 window: &mut Window,
111 cx: &mut Context<Session>,
112 ) {
113 self.execution_view.update(cx, |execution_view, cx| {
114 execution_view.push_message(&message.content, window, cx);
115 });
116 }
117
118 fn create_output_area_renderer(
119 execution_view: Entity<ExecutionView>,
120 on_close: CloseBlockFn,
121 ) -> RenderBlock {
122 Arc::new(move |cx: &mut BlockContext| {
123 let execution_view = execution_view.clone();
124 let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
125
126 let editor_margins = cx.margins;
127 let gutter = editor_margins.gutter;
128
129 let block_id = cx.block_id;
130 let on_close = on_close.clone();
131
132 let rem_size = cx.window.rem_size();
133
134 let text_line_height = text_style.line_height_in_pixels(rem_size);
135
136 let close_button = h_flex()
137 .flex_none()
138 .items_center()
139 .justify_center()
140 .absolute()
141 .top(text_line_height / 2.)
142 .right(
143 // 2px is a magic number to nudge the button just a bit closer to
144 // the line number start
145 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
146 )
147 .w(text_line_height)
148 .h(text_line_height)
149 .child(
150 IconButton::new("close_output_area", IconName::Close)
151 .icon_size(IconSize::Small)
152 .icon_color(Color::Muted)
153 .size(ButtonSize::Compact)
154 .shape(IconButtonShape::Square)
155 .tooltip(Tooltip::text("Close output area"))
156 .on_click(move |_, window, cx| {
157 if let BlockId::Custom(block_id) = block_id {
158 (on_close)(block_id, window, cx)
159 }
160 }),
161 );
162
163 div()
164 .id(cx.block_id)
165 .block_mouse_except_scroll()
166 .flex()
167 .items_start()
168 .min_h(text_line_height)
169 .w_full()
170 .border_y_1()
171 .border_color(cx.theme().colors().border)
172 .bg(cx.theme().colors().background)
173 .child(
174 div()
175 .relative()
176 .w(gutter.full_width())
177 .h(text_line_height * 2)
178 .child(close_button),
179 )
180 .child(
181 div()
182 .flex_1()
183 .size_full()
184 .py(text_line_height / 2.)
185 .mr(editor_margins.right)
186 .pr_2()
187 .child(execution_view),
188 )
189 .into_any_element()
190 })
191 }
192}
193
194impl Session {
195 pub fn new(
196 editor: WeakEntity<Editor>,
197 fs: Arc<dyn Fs>,
198 kernel_specification: KernelSpecification,
199 window: &mut Window,
200 cx: &mut Context<Self>,
201 ) -> Self {
202 let subscription = match editor.upgrade() {
203 Some(editor) => {
204 let buffer = editor.read(cx).buffer().clone();
205 cx.subscribe(&buffer, Self::on_buffer_event)
206 }
207 None => Subscription::new(|| {}),
208 };
209
210 let editor_handle = editor.clone();
211
212 editor
213 .update(cx, |editor, _cx| {
214 setup_editor_session_actions(editor, editor_handle);
215 })
216 .ok();
217
218 let mut session = Self {
219 fs,
220 editor,
221 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
222 blocks: HashMap::default(),
223 kernel_specification,
224 _buffer_subscription: subscription,
225 };
226
227 session.start_kernel(window, cx);
228 session
229 }
230
231 fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
232 let kernel_language = self.kernel_specification.language();
233 let entity_id = self.editor.entity_id();
234 let working_directory = self
235 .editor
236 .upgrade()
237 .and_then(|editor| editor.read(cx).working_directory(cx))
238 .unwrap_or_else(temp_dir);
239
240 telemetry::event!(
241 "Kernel Status Changed",
242 kernel_language,
243 kernel_status = KernelStatus::Starting.to_string(),
244 repl_session_id = cx.entity_id().to_string(),
245 );
246
247 let session_view = cx.entity();
248
249 let kernel = match self.kernel_specification.clone() {
250 KernelSpecification::Jupyter(kernel_specification)
251 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
252 kernel_specification,
253 entity_id,
254 working_directory,
255 self.fs.clone(),
256 session_view,
257 window,
258 cx,
259 ),
260 KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
261 remote_kernel_specification,
262 working_directory,
263 session_view,
264 window,
265 cx,
266 ),
267 };
268
269 let pending_kernel = cx
270 .spawn(async move |this, cx| {
271 let kernel = kernel.await;
272
273 match kernel {
274 Ok(kernel) => {
275 this.update(cx, |session, cx| {
276 session.kernel(Kernel::RunningKernel(kernel), cx);
277 })
278 .ok();
279 }
280 Err(err) => {
281 this.update(cx, |session, cx| {
282 session.kernel_errored(err.to_string(), cx);
283 })
284 .ok();
285 }
286 }
287 })
288 .shared();
289
290 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
291 cx.notify();
292 }
293
294 pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
295 self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
296
297 self.blocks.values().for_each(|block| {
298 block.execution_view.update(cx, |execution_view, cx| {
299 match execution_view.status {
300 ExecutionStatus::Finished => {
301 // Do nothing when the output was good
302 }
303 _ => {
304 // All other cases, set the status to errored
305 execution_view.status =
306 ExecutionStatus::KernelErrored(error_message.clone())
307 }
308 }
309 cx.notify();
310 });
311 });
312 }
313
314 fn on_buffer_event(
315 &mut self,
316 buffer: Entity<MultiBuffer>,
317 event: &multi_buffer::Event,
318 cx: &mut Context<Self>,
319 ) {
320 if let multi_buffer::Event::Edited { .. } = event {
321 let snapshot = buffer.read(cx).snapshot(cx);
322
323 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
324
325 self.blocks.retain(|_id, block| {
326 if block.invalidation_anchor.is_valid(&snapshot) {
327 true
328 } else {
329 blocks_to_remove.insert(block.block_id);
330 false
331 }
332 });
333
334 if !blocks_to_remove.is_empty() {
335 self.editor
336 .update(cx, |editor, cx| {
337 editor.remove_blocks(blocks_to_remove, None, cx);
338 })
339 .ok();
340 cx.notify();
341 }
342 }
343 }
344
345 fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
346 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
347 kernel.request_tx().try_send(message).ok();
348 }
349
350 anyhow::Ok(())
351 }
352
353 pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
354 let blocks_to_remove: HashSet<CustomBlockId> =
355 self.blocks.values().map(|block| block.block_id).collect();
356
357 self.editor
358 .update(cx, |editor, cx| {
359 editor.remove_blocks(blocks_to_remove, None, cx);
360 })
361 .ok();
362
363 self.blocks.clear();
364 }
365
366 pub fn execute(
367 &mut self,
368 code: String,
369 anchor_range: Range<Anchor>,
370 next_cell: Option<Anchor>,
371 move_down: bool,
372 window: &mut Window,
373 cx: &mut Context<Self>,
374 ) {
375 let Some(editor) = self.editor.upgrade() else {
376 return;
377 };
378
379 if code.is_empty() {
380 return;
381 }
382
383 let execute_request = ExecuteRequest {
384 code,
385 ..ExecuteRequest::default()
386 };
387
388 let message: JupyterMessage = execute_request.into();
389
390 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
391
392 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
393
394 self.blocks.retain(|_key, block| {
395 if anchor_range.overlaps(&block.code_range, &buffer) {
396 blocks_to_remove.insert(block.block_id);
397 false
398 } else {
399 true
400 }
401 });
402
403 self.editor
404 .update(cx, |editor, cx| {
405 editor.remove_blocks(blocks_to_remove, None, cx);
406 })
407 .ok();
408
409 let status = match &self.kernel {
410 Kernel::Restarting => ExecutionStatus::Restarting,
411 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
412 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
413 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
414 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
415 Kernel::Shutdown => ExecutionStatus::Shutdown,
416 };
417
418 let parent_message_id = message.header.msg_id.clone();
419 let session_view = cx.entity().downgrade();
420 let weak_editor = self.editor.clone();
421
422 let on_close: CloseBlockFn = Arc::new(
423 move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
424 if let Some(session) = session_view.upgrade() {
425 session.update(cx, |session, cx| {
426 session.blocks.remove(&parent_message_id);
427 cx.notify();
428 });
429 }
430
431 if let Some(editor) = weak_editor.upgrade() {
432 editor.update(cx, |editor, cx| {
433 let mut block_ids = HashSet::default();
434 block_ids.insert(block_id);
435 editor.remove_blocks(block_ids, None, cx);
436 });
437 }
438 },
439 );
440
441 let Ok(editor_block) =
442 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
443 else {
444 return;
445 };
446
447 let new_cursor_pos = if let Some(next_cursor) = next_cell {
448 next_cursor
449 } else {
450 editor_block.invalidation_anchor
451 };
452
453 self.blocks
454 .insert(message.header.msg_id.clone(), editor_block);
455
456 match &self.kernel {
457 Kernel::RunningKernel(_) => {
458 self.send(message, cx).ok();
459 }
460 Kernel::StartingKernel(task) => {
461 // Queue up the execution as a task to run after the kernel starts
462 let task = task.clone();
463
464 cx.spawn(async move |this, cx| {
465 task.await;
466 this.update(cx, |session, cx| {
467 session.send(message, cx).ok();
468 })
469 .ok();
470 })
471 .detach();
472 }
473 _ => {}
474 }
475
476 if move_down {
477 editor.update(cx, move |editor, cx| {
478 editor.change_selections(
479 SelectionEffects::scroll(Autoscroll::top_relative(8)),
480 window,
481 cx,
482 |selections| {
483 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
484 },
485 );
486 });
487 }
488 }
489
490 pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
491 let parent_message_id = match message.parent_header.as_ref() {
492 Some(header) => &header.msg_id,
493 None => return,
494 };
495
496 match &message.content {
497 JupyterMessageContent::Status(status) => {
498 self.kernel.set_execution_state(&status.execution_state);
499
500 telemetry::event!(
501 "Kernel Status Changed",
502 kernel_language = self.kernel_specification.language(),
503 kernel_status = KernelStatus::from(&self.kernel).to_string(),
504 repl_session_id = cx.entity_id().to_string(),
505 );
506
507 cx.notify();
508 }
509 JupyterMessageContent::KernelInfoReply(reply) => {
510 self.kernel.set_kernel_info(reply);
511 cx.notify();
512 }
513 JupyterMessageContent::UpdateDisplayData(update) => {
514 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
515 display_id
516 } else {
517 return;
518 };
519
520 self.blocks.iter_mut().for_each(|(_, block)| {
521 block.execution_view.update(cx, |execution_view, cx| {
522 execution_view.update_display_data(&update.data, &display_id, window, cx);
523 });
524 });
525 return;
526 }
527 _ => {}
528 }
529
530 if let Some(block) = self.blocks.get_mut(parent_message_id) {
531 block.handle_message(message, window, cx);
532 }
533 }
534
535 pub fn interrupt(&mut self, cx: &mut Context<Self>) {
536 match &mut self.kernel {
537 Kernel::RunningKernel(_kernel) => {
538 self.send(InterruptRequest {}.into(), cx).ok();
539 }
540 Kernel::StartingKernel(_task) => {
541 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
542 }
543 _ => {}
544 }
545 }
546
547 pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
548 if let Kernel::Shutdown = kernel {
549 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
550 }
551
552 let kernel_status = KernelStatus::from(&kernel).to_string();
553 let kernel_language = self.kernel_specification.language();
554
555 telemetry::event!(
556 "Kernel Status Changed",
557 kernel_language,
558 kernel_status,
559 repl_session_id = cx.entity_id().to_string(),
560 );
561
562 self.kernel = kernel;
563 }
564
565 pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
566 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
567
568 match kernel {
569 Kernel::RunningKernel(mut kernel) => {
570 let mut request_tx = kernel.request_tx();
571
572 let forced = kernel.force_shutdown(window, cx);
573
574 cx.spawn(async move |this, cx| {
575 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
576 request_tx.try_send(message).ok();
577
578 forced.await.log_err();
579
580 // Give the kernel a bit of time to clean up
581 cx.background_executor().timer(Duration::from_secs(3)).await;
582
583 this.update(cx, |session, cx| {
584 session.clear_outputs(cx);
585 session.kernel(Kernel::Shutdown, cx);
586 cx.notify();
587 })
588 .ok();
589 })
590 .detach();
591 }
592 _ => {
593 self.kernel(Kernel::Shutdown, cx);
594 }
595 }
596 cx.notify();
597 }
598
599 pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
600 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
601
602 match kernel {
603 Kernel::Restarting => {
604 // Do nothing if already restarting
605 }
606 Kernel::RunningKernel(mut kernel) => {
607 let mut request_tx = kernel.request_tx();
608
609 let forced = kernel.force_shutdown(window, cx);
610
611 cx.spawn_in(window, async move |this, cx| {
612 // Send shutdown request with restart flag
613 log::debug!("restarting kernel");
614 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
615 request_tx.try_send(message).ok();
616
617 // Wait for kernel to shutdown
618 cx.background_executor().timer(Duration::from_secs(1)).await;
619
620 // Force kill the kernel if it hasn't shut down
621 forced.await.log_err();
622
623 // Start a new kernel
624 this.update_in(cx, |session, window, cx| {
625 // TODO: Differentiate between restart and restart+clear-outputs
626 session.clear_outputs(cx);
627 session.start_kernel(window, cx);
628 })
629 .ok();
630 })
631 .detach();
632 }
633 _ => {
634 self.clear_outputs(cx);
635 self.start_kernel(window, cx);
636 }
637 }
638 cx.notify();
639 }
640}
641
642pub enum SessionEvent {
643 Shutdown(WeakEntity<Editor>),
644}
645
646impl EventEmitter<SessionEvent> for Session {}
647
648impl Render for Session {
649 fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
650 let (status_text, interrupt_button) = match &self.kernel {
651 Kernel::RunningKernel(kernel) => (
652 kernel
653 .kernel_info()
654 .as_ref()
655 .map(|info| info.language_info.name.clone()),
656 Some(
657 Button::new("interrupt", "Interrupt")
658 .style(ButtonStyle::Subtle)
659 .on_click(cx.listener(move |session, _, _, cx| {
660 session.interrupt(cx);
661 })),
662 ),
663 ),
664 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
665 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
666 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
667 Kernel::Shutdown => (Some("Shutdown".into()), None),
668 Kernel::Restarting => (Some("Restarting".into()), None),
669 };
670
671 KernelListItem::new(self.kernel_specification.clone())
672 .status_color(match &self.kernel {
673 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
674 ExecutionState::Idle => Color::Success,
675 ExecutionState::Busy => Color::Modified,
676 },
677 Kernel::StartingKernel(_) => Color::Modified,
678 Kernel::ErroredLaunch(_) => Color::Error,
679 Kernel::ShuttingDown => Color::Modified,
680 Kernel::Shutdown => Color::Disabled,
681 Kernel::Restarting => Color::Modified,
682 })
683 .child(Label::new(self.kernel_specification.name()))
684 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
685 .button(
686 Button::new("shutdown", "Shutdown")
687 .style(ButtonStyle::Subtle)
688 .disabled(self.kernel.is_shutting_down())
689 .on_click(cx.listener(move |session, _, window, cx| {
690 session.shutdown(window, cx);
691 })),
692 )
693 .buttons(interrupt_button)
694 }
695}