1use crate::components::KernelListItem;
2use crate::kernels::RemoteRunningKernel;
3use crate::setup_editor_session_actions;
4use crate::{
5 KernelStatus,
6 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
7 outputs::{ExecutionStatus, ExecutionView},
8};
9use collections::{HashMap, HashSet};
10use editor::{
11 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
12 display_map::{
13 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
14 RenderBlock,
15 },
16 scroll::Autoscroll,
17};
18use futures::FutureExt as _;
19use gpui::{
20 Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
21};
22use language::Point;
23use project::Fs;
24use runtimelib::{
25 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
26 ShutdownRequest,
27};
28use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
29use theme::ActiveTheme;
30use ui::{IconButtonShape, Tooltip, prelude::*};
31use util::ResultExt as _;
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakEntity<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 pub kernel_specification: KernelSpecification,
39 _buffer_subscription: Subscription,
40}
41
42struct EditorBlock {
43 code_range: Range<Anchor>,
44 invalidation_anchor: Anchor,
45 block_id: CustomBlockId,
46 execution_view: Entity<ExecutionView>,
47}
48
49type CloseBlockFn =
50 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
51
52impl EditorBlock {
53 fn new(
54 editor: WeakEntity<Editor>,
55 code_range: Range<Anchor>,
56 status: ExecutionStatus,
57 on_close: CloseBlockFn,
58 cx: &mut Context<Session>,
59 ) -> anyhow::Result<Self> {
60 let editor = editor
61 .upgrade()
62 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
63 let workspace = editor
64 .read(cx)
65 .workspace()
66 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
67
68 let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
69
70 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
71 let buffer = editor.buffer().clone();
72 let buffer_snapshot = buffer.read(cx).snapshot(cx);
73 let end_point = code_range.end.to_point(&buffer_snapshot);
74 let next_row_start = end_point + Point::new(1, 0);
75 if next_row_start > buffer_snapshot.max_point() {
76 buffer.update(cx, |buffer, cx| {
77 buffer.edit(
78 [(
79 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
80 "\n",
81 )],
82 None,
83 cx,
84 )
85 });
86 }
87
88 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
89 let block = BlockProperties {
90 placement: BlockPlacement::Below(code_range.end),
91 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
92 height: Some(1),
93 style: BlockStyle::Sticky,
94 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
95 priority: 0,
96 render_in_minimap: false,
97 };
98
99 let block_id = editor.insert_blocks([block], None, cx)[0];
100 (block_id, invalidation_anchor)
101 });
102
103 anyhow::Ok(Self {
104 code_range,
105 invalidation_anchor,
106 block_id,
107 execution_view,
108 })
109 }
110
111 fn handle_message(
112 &mut self,
113 message: &JupyterMessage,
114 window: &mut Window,
115 cx: &mut Context<Session>,
116 ) {
117 self.execution_view.update(cx, |execution_view, cx| {
118 execution_view.push_message(&message.content, window, cx);
119 });
120 }
121
122 fn create_output_area_renderer(
123 execution_view: Entity<ExecutionView>,
124 on_close: CloseBlockFn,
125 ) -> RenderBlock {
126 Arc::new(move |cx: &mut BlockContext| {
127 let execution_view = execution_view.clone();
128 let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
129
130 let editor_margins = cx.margins;
131 let gutter = editor_margins.gutter;
132
133 let block_id = cx.block_id;
134 let on_close = on_close.clone();
135
136 let rem_size = cx.window.rem_size();
137
138 let text_line_height = text_style.line_height_in_pixels(rem_size);
139
140 let close_button = h_flex()
141 .flex_none()
142 .items_center()
143 .justify_center()
144 .absolute()
145 .top(text_line_height / 2.)
146 .right(
147 // 2px is a magic number to nudge the button just a bit closer to
148 // the line number start
149 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
150 )
151 .w(text_line_height)
152 .h(text_line_height)
153 .child(
154 IconButton::new("close_output_area", IconName::Close)
155 .icon_size(IconSize::Small)
156 .icon_color(Color::Muted)
157 .size(ButtonSize::Compact)
158 .shape(IconButtonShape::Square)
159 .tooltip(Tooltip::text("Close output area"))
160 .on_click(move |_, window, cx| {
161 if let BlockId::Custom(block_id) = block_id {
162 (on_close)(block_id, window, cx)
163 }
164 }),
165 );
166
167 div()
168 .id(cx.block_id)
169 .block_mouse_down()
170 .flex()
171 .items_start()
172 .min_h(text_line_height)
173 .w_full()
174 .border_y_1()
175 .border_color(cx.theme().colors().border)
176 .bg(cx.theme().colors().background)
177 .child(
178 div()
179 .relative()
180 .w(gutter.full_width())
181 .h(text_line_height * 2)
182 .child(close_button),
183 )
184 .child(
185 div()
186 .flex_1()
187 .size_full()
188 .py(text_line_height / 2.)
189 .mr(editor_margins.right)
190 .pr_2()
191 .child(execution_view),
192 )
193 .into_any_element()
194 })
195 }
196}
197
198impl Session {
199 pub fn new(
200 editor: WeakEntity<Editor>,
201 fs: Arc<dyn Fs>,
202 kernel_specification: KernelSpecification,
203 window: &mut Window,
204 cx: &mut Context<Self>,
205 ) -> Self {
206 let subscription = match editor.upgrade() {
207 Some(editor) => {
208 let buffer = editor.read(cx).buffer().clone();
209 cx.subscribe(&buffer, Self::on_buffer_event)
210 }
211 None => Subscription::new(|| {}),
212 };
213
214 let editor_handle = editor.clone();
215
216 editor
217 .update(cx, |editor, _cx| {
218 setup_editor_session_actions(editor, editor_handle);
219 })
220 .ok();
221
222 let mut session = Self {
223 fs,
224 editor,
225 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
226 blocks: HashMap::default(),
227 kernel_specification,
228 _buffer_subscription: subscription,
229 };
230
231 session.start_kernel(window, cx);
232 session
233 }
234
235 fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
236 let kernel_language = self.kernel_specification.language();
237 let entity_id = self.editor.entity_id();
238 let working_directory = self
239 .editor
240 .upgrade()
241 .and_then(|editor| editor.read(cx).working_directory(cx))
242 .unwrap_or_else(temp_dir);
243
244 telemetry::event!(
245 "Kernel Status Changed",
246 kernel_language,
247 kernel_status = KernelStatus::Starting.to_string(),
248 repl_session_id = cx.entity_id().to_string(),
249 );
250
251 let session_view = cx.entity().clone();
252
253 let kernel = match self.kernel_specification.clone() {
254 KernelSpecification::Jupyter(kernel_specification)
255 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
256 kernel_specification,
257 entity_id,
258 working_directory,
259 self.fs.clone(),
260 session_view,
261 window,
262 cx,
263 ),
264 KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
265 remote_kernel_specification,
266 working_directory,
267 session_view,
268 window,
269 cx,
270 ),
271 };
272
273 let pending_kernel = cx
274 .spawn(async move |this, cx| {
275 let kernel = kernel.await;
276
277 match kernel {
278 Ok(kernel) => {
279 this.update(cx, |session, cx| {
280 session.kernel(Kernel::RunningKernel(kernel), cx);
281 })
282 .ok();
283 }
284 Err(err) => {
285 this.update(cx, |session, cx| {
286 session.kernel_errored(err.to_string(), cx);
287 })
288 .ok();
289 }
290 }
291 })
292 .shared();
293
294 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
295 cx.notify();
296 }
297
298 pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
299 self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
300
301 self.blocks.values().for_each(|block| {
302 block.execution_view.update(cx, |execution_view, cx| {
303 match execution_view.status {
304 ExecutionStatus::Finished => {
305 // Do nothing when the output was good
306 }
307 _ => {
308 // All other cases, set the status to errored
309 execution_view.status =
310 ExecutionStatus::KernelErrored(error_message.clone())
311 }
312 }
313 cx.notify();
314 });
315 });
316 }
317
318 fn on_buffer_event(
319 &mut self,
320 buffer: Entity<MultiBuffer>,
321 event: &multi_buffer::Event,
322 cx: &mut Context<Self>,
323 ) {
324 if let multi_buffer::Event::Edited { .. } = event {
325 let snapshot = buffer.read(cx).snapshot(cx);
326
327 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
328
329 self.blocks.retain(|_id, block| {
330 if block.invalidation_anchor.is_valid(&snapshot) {
331 true
332 } else {
333 blocks_to_remove.insert(block.block_id);
334 false
335 }
336 });
337
338 if !blocks_to_remove.is_empty() {
339 self.editor
340 .update(cx, |editor, cx| {
341 editor.remove_blocks(blocks_to_remove, None, cx);
342 })
343 .ok();
344 cx.notify();
345 }
346 }
347 }
348
349 fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
350 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
351 kernel.request_tx().try_send(message).ok();
352 }
353
354 anyhow::Ok(())
355 }
356
357 pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
358 let blocks_to_remove: HashSet<CustomBlockId> =
359 self.blocks.values().map(|block| block.block_id).collect();
360
361 self.editor
362 .update(cx, |editor, cx| {
363 editor.remove_blocks(blocks_to_remove, None, cx);
364 })
365 .ok();
366
367 self.blocks.clear();
368 }
369
370 pub fn execute(
371 &mut self,
372 code: String,
373 anchor_range: Range<Anchor>,
374 next_cell: Option<Anchor>,
375 move_down: bool,
376 window: &mut Window,
377 cx: &mut Context<Self>,
378 ) {
379 let Some(editor) = self.editor.upgrade() else {
380 return;
381 };
382
383 if code.is_empty() {
384 return;
385 }
386
387 let execute_request = ExecuteRequest {
388 code,
389 ..ExecuteRequest::default()
390 };
391
392 let message: JupyterMessage = execute_request.into();
393
394 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
395
396 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
397
398 self.blocks.retain(|_key, block| {
399 if anchor_range.overlaps(&block.code_range, &buffer) {
400 blocks_to_remove.insert(block.block_id);
401 false
402 } else {
403 true
404 }
405 });
406
407 self.editor
408 .update(cx, |editor, cx| {
409 editor.remove_blocks(blocks_to_remove, None, cx);
410 })
411 .ok();
412
413 let status = match &self.kernel {
414 Kernel::Restarting => ExecutionStatus::Restarting,
415 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
416 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
417 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
418 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
419 Kernel::Shutdown => ExecutionStatus::Shutdown,
420 };
421
422 let parent_message_id = message.header.msg_id.clone();
423 let session_view = cx.entity().downgrade();
424 let weak_editor = self.editor.clone();
425
426 let on_close: CloseBlockFn = Arc::new(
427 move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
428 if let Some(session) = session_view.upgrade() {
429 session.update(cx, |session, cx| {
430 session.blocks.remove(&parent_message_id);
431 cx.notify();
432 });
433 }
434
435 if let Some(editor) = weak_editor.upgrade() {
436 editor.update(cx, |editor, cx| {
437 let mut block_ids = HashSet::default();
438 block_ids.insert(block_id);
439 editor.remove_blocks(block_ids, None, cx);
440 });
441 }
442 },
443 );
444
445 let Ok(editor_block) =
446 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
447 else {
448 return;
449 };
450
451 let new_cursor_pos = if let Some(next_cursor) = next_cell {
452 next_cursor
453 } else {
454 editor_block.invalidation_anchor
455 };
456
457 self.blocks
458 .insert(message.header.msg_id.clone(), editor_block);
459
460 match &self.kernel {
461 Kernel::RunningKernel(_) => {
462 self.send(message, cx).ok();
463 }
464 Kernel::StartingKernel(task) => {
465 // Queue up the execution as a task to run after the kernel starts
466 let task = task.clone();
467 let message = message.clone();
468
469 cx.spawn(async move |this, cx| {
470 task.await;
471 this.update(cx, |session, cx| {
472 session.send(message, cx).ok();
473 })
474 .ok();
475 })
476 .detach();
477 }
478 _ => {}
479 }
480
481 if move_down {
482 editor.update(cx, move |editor, cx| {
483 editor.change_selections(
484 Some(Autoscroll::top_relative(8)),
485 window,
486 cx,
487 |selections| {
488 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
489 },
490 );
491 });
492 }
493 }
494
495 pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
496 let parent_message_id = match message.parent_header.as_ref() {
497 Some(header) => &header.msg_id,
498 None => return,
499 };
500
501 match &message.content {
502 JupyterMessageContent::Status(status) => {
503 self.kernel.set_execution_state(&status.execution_state);
504
505 telemetry::event!(
506 "Kernel Status Changed",
507 kernel_language = self.kernel_specification.language(),
508 kernel_status = KernelStatus::from(&self.kernel).to_string(),
509 repl_session_id = cx.entity_id().to_string(),
510 );
511
512 cx.notify();
513 }
514 JupyterMessageContent::KernelInfoReply(reply) => {
515 self.kernel.set_kernel_info(reply);
516 cx.notify();
517 }
518 JupyterMessageContent::UpdateDisplayData(update) => {
519 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
520 display_id
521 } else {
522 return;
523 };
524
525 self.blocks.iter_mut().for_each(|(_, block)| {
526 block.execution_view.update(cx, |execution_view, cx| {
527 execution_view.update_display_data(&update.data, &display_id, window, cx);
528 });
529 });
530 return;
531 }
532 _ => {}
533 }
534
535 if let Some(block) = self.blocks.get_mut(parent_message_id) {
536 block.handle_message(message, window, cx);
537 }
538 }
539
540 pub fn interrupt(&mut self, cx: &mut Context<Self>) {
541 match &mut self.kernel {
542 Kernel::RunningKernel(_kernel) => {
543 self.send(InterruptRequest {}.into(), cx).ok();
544 }
545 Kernel::StartingKernel(_task) => {
546 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
547 }
548 _ => {}
549 }
550 }
551
552 pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
553 if let Kernel::Shutdown = kernel {
554 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
555 }
556
557 let kernel_status = KernelStatus::from(&kernel).to_string();
558 let kernel_language = self.kernel_specification.language();
559
560 telemetry::event!(
561 "Kernel Status Changed",
562 kernel_language,
563 kernel_status,
564 repl_session_id = cx.entity_id().to_string(),
565 );
566
567 self.kernel = kernel;
568 }
569
570 pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
571 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
572
573 match kernel {
574 Kernel::RunningKernel(mut kernel) => {
575 let mut request_tx = kernel.request_tx().clone();
576
577 let forced = kernel.force_shutdown(window, cx);
578
579 cx.spawn(async move |this, cx| {
580 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
581 request_tx.try_send(message).ok();
582
583 forced.await.log_err();
584
585 // Give the kernel a bit of time to clean up
586 cx.background_executor().timer(Duration::from_secs(3)).await;
587
588 this.update(cx, |session, cx| {
589 session.clear_outputs(cx);
590 session.kernel(Kernel::Shutdown, cx);
591 cx.notify();
592 })
593 .ok();
594 })
595 .detach();
596 }
597 _ => {
598 self.kernel(Kernel::Shutdown, cx);
599 }
600 }
601 cx.notify();
602 }
603
604 pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
605 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
606
607 match kernel {
608 Kernel::Restarting => {
609 // Do nothing if already restarting
610 }
611 Kernel::RunningKernel(mut kernel) => {
612 let mut request_tx = kernel.request_tx().clone();
613
614 let forced = kernel.force_shutdown(window, cx);
615
616 cx.spawn_in(window, async move |this, cx| {
617 // Send shutdown request with restart flag
618 log::debug!("restarting kernel");
619 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
620 request_tx.try_send(message).ok();
621
622 // Wait for kernel to shutdown
623 cx.background_executor().timer(Duration::from_secs(1)).await;
624
625 // Force kill the kernel if it hasn't shut down
626 forced.await.log_err();
627
628 // Start a new kernel
629 this.update_in(cx, |session, window, cx| {
630 // TODO: Differentiate between restart and restart+clear-outputs
631 session.clear_outputs(cx);
632 session.start_kernel(window, cx);
633 })
634 .ok();
635 })
636 .detach();
637 }
638 _ => {
639 self.clear_outputs(cx);
640 self.start_kernel(window, cx);
641 }
642 }
643 cx.notify();
644 }
645}
646
647pub enum SessionEvent {
648 Shutdown(WeakEntity<Editor>),
649}
650
651impl EventEmitter<SessionEvent> for Session {}
652
653impl Render for Session {
654 fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
655 let (status_text, interrupt_button) = match &self.kernel {
656 Kernel::RunningKernel(kernel) => (
657 kernel
658 .kernel_info()
659 .as_ref()
660 .map(|info| info.language_info.name.clone()),
661 Some(
662 Button::new("interrupt", "Interrupt")
663 .style(ButtonStyle::Subtle)
664 .on_click(cx.listener(move |session, _, _, cx| {
665 session.interrupt(cx);
666 })),
667 ),
668 ),
669 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
670 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
671 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
672 Kernel::Shutdown => (Some("Shutdown".into()), None),
673 Kernel::Restarting => (Some("Restarting".into()), None),
674 };
675
676 KernelListItem::new(self.kernel_specification.clone())
677 .status_color(match &self.kernel {
678 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
679 ExecutionState::Idle => Color::Success,
680 ExecutionState::Busy => Color::Modified,
681 },
682 Kernel::StartingKernel(_) => Color::Modified,
683 Kernel::ErroredLaunch(_) => Color::Error,
684 Kernel::ShuttingDown => Color::Modified,
685 Kernel::Shutdown => Color::Disabled,
686 Kernel::Restarting => Color::Modified,
687 })
688 .child(Label::new(self.kernel_specification.name()))
689 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
690 .button(
691 Button::new("shutdown", "Shutdown")
692 .style(ButtonStyle::Subtle)
693 .disabled(self.kernel.is_shutting_down())
694 .on_click(cx.listener(move |session, _, window, cx| {
695 session.shutdown(window, cx);
696 })),
697 )
698 .buttons(interrupt_button)
699 }
700}