1use crate::components::KernelListItem;
2use crate::kernels::RemoteRunningKernel;
3use crate::setup_editor_session_actions;
4use crate::{
5 KernelStatus,
6 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
7 outputs::{ExecutionStatus, ExecutionView},
8};
9use anyhow::Context as _;
10use collections::{HashMap, HashSet};
11use editor::SelectionEffects;
12use editor::{
13 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
14 display_map::{
15 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
16 RenderBlock,
17 },
18 scroll::Autoscroll,
19};
20use futures::FutureExt as _;
21use gpui::{
22 Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
23};
24use language::Point;
25use project::Fs;
26use runtimelib::{
27 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
28 ShutdownRequest,
29};
30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
31use theme::ActiveTheme;
32use ui::{IconButtonShape, Tooltip, prelude::*};
33use util::ResultExt as _;
34
35pub struct Session {
36 fs: Arc<dyn Fs>,
37 editor: WeakEntity<Editor>,
38 pub kernel: Kernel,
39 blocks: HashMap<String, EditorBlock>,
40 pub kernel_specification: KernelSpecification,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: Entity<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakEntity<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut Context<Session>,
61 ) -> anyhow::Result<Self> {
62 let editor = editor.upgrade().context("editor is not open")?;
63 let workspace = editor.read(cx).workspace().context("workspace dropped")?;
64
65 let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
66
67 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
68 let buffer = editor.buffer().clone();
69 let buffer_snapshot = buffer.read(cx).snapshot(cx);
70 let end_point = code_range.end.to_point(&buffer_snapshot);
71 let next_row_start = end_point + Point::new(1, 0);
72 if next_row_start > buffer_snapshot.max_point() {
73 buffer.update(cx, |buffer, cx| {
74 buffer.edit(
75 [(
76 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
77 "\n",
78 )],
79 None,
80 cx,
81 )
82 });
83 }
84
85 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
86 let block = BlockProperties {
87 placement: BlockPlacement::Below(code_range.end),
88 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
89 height: Some(1),
90 style: BlockStyle::Sticky,
91 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
92 priority: 0,
93 render_in_minimap: false,
94 };
95
96 let block_id = editor.insert_blocks([block], None, cx)[0];
97 (block_id, invalidation_anchor)
98 });
99
100 anyhow::Ok(Self {
101 code_range,
102 invalidation_anchor,
103 block_id,
104 execution_view,
105 })
106 }
107
108 fn handle_message(
109 &mut self,
110 message: &JupyterMessage,
111 window: &mut Window,
112 cx: &mut Context<Session>,
113 ) {
114 self.execution_view.update(cx, |execution_view, cx| {
115 execution_view.push_message(&message.content, window, cx);
116 });
117 }
118
119 fn create_output_area_renderer(
120 execution_view: Entity<ExecutionView>,
121 on_close: CloseBlockFn,
122 ) -> RenderBlock {
123 Arc::new(move |cx: &mut BlockContext| {
124 let execution_view = execution_view.clone();
125 let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
126
127 let editor_margins = cx.margins;
128 let gutter = editor_margins.gutter;
129
130 let block_id = cx.block_id;
131 let on_close = on_close.clone();
132
133 let rem_size = cx.window.rem_size();
134
135 let text_line_height = text_style.line_height_in_pixels(rem_size);
136
137 let close_button = h_flex()
138 .flex_none()
139 .items_center()
140 .justify_center()
141 .absolute()
142 .top(text_line_height / 2.)
143 .right(
144 // 2px is a magic number to nudge the button just a bit closer to
145 // the line number start
146 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
147 )
148 .w(text_line_height)
149 .h(text_line_height)
150 .child(
151 IconButton::new("close_output_area", IconName::Close)
152 .icon_size(IconSize::Small)
153 .icon_color(Color::Muted)
154 .size(ButtonSize::Compact)
155 .shape(IconButtonShape::Square)
156 .tooltip(Tooltip::text("Close output area"))
157 .on_click(move |_, window, cx| {
158 if let BlockId::Custom(block_id) = block_id {
159 (on_close)(block_id, window, cx)
160 }
161 }),
162 );
163
164 div()
165 .id(cx.block_id)
166 .block_mouse_except_scroll()
167 .flex()
168 .items_start()
169 .min_h(text_line_height)
170 .w_full()
171 .border_y_1()
172 .border_color(cx.theme().colors().border)
173 .bg(cx.theme().colors().background)
174 .child(
175 div()
176 .relative()
177 .w(gutter.full_width())
178 .h(text_line_height * 2)
179 .child(close_button),
180 )
181 .child(
182 div()
183 .flex_1()
184 .size_full()
185 .py(text_line_height / 2.)
186 .mr(editor_margins.right)
187 .pr_2()
188 .child(execution_view),
189 )
190 .into_any_element()
191 })
192 }
193}
194
195impl Session {
196 pub fn new(
197 editor: WeakEntity<Editor>,
198 fs: Arc<dyn Fs>,
199 kernel_specification: KernelSpecification,
200 window: &mut Window,
201 cx: &mut Context<Self>,
202 ) -> Self {
203 let subscription = match editor.upgrade() {
204 Some(editor) => {
205 let buffer = editor.read(cx).buffer().clone();
206 cx.subscribe(&buffer, Self::on_buffer_event)
207 }
208 None => Subscription::new(|| {}),
209 };
210
211 let editor_handle = editor.clone();
212
213 editor
214 .update(cx, |editor, _cx| {
215 setup_editor_session_actions(editor, editor_handle);
216 })
217 .ok();
218
219 let mut session = Self {
220 fs,
221 editor,
222 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
223 blocks: HashMap::default(),
224 kernel_specification,
225 _buffer_subscription: subscription,
226 };
227
228 session.start_kernel(window, cx);
229 session
230 }
231
232 fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
233 let kernel_language = self.kernel_specification.language();
234 let entity_id = self.editor.entity_id();
235 let working_directory = self
236 .editor
237 .upgrade()
238 .and_then(|editor| editor.read(cx).working_directory(cx))
239 .unwrap_or_else(temp_dir);
240
241 telemetry::event!(
242 "Kernel Status Changed",
243 kernel_language,
244 kernel_status = KernelStatus::Starting.to_string(),
245 repl_session_id = cx.entity_id().to_string(),
246 );
247
248 let session_view = cx.entity().clone();
249
250 let kernel = match self.kernel_specification.clone() {
251 KernelSpecification::Jupyter(kernel_specification)
252 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
253 kernel_specification,
254 entity_id,
255 working_directory,
256 self.fs.clone(),
257 session_view,
258 window,
259 cx,
260 ),
261 KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
262 remote_kernel_specification,
263 working_directory,
264 session_view,
265 window,
266 cx,
267 ),
268 };
269
270 let pending_kernel = cx
271 .spawn(async move |this, cx| {
272 let kernel = kernel.await;
273
274 match kernel {
275 Ok(kernel) => {
276 this.update(cx, |session, cx| {
277 session.kernel(Kernel::RunningKernel(kernel), cx);
278 })
279 .ok();
280 }
281 Err(err) => {
282 this.update(cx, |session, cx| {
283 session.kernel_errored(err.to_string(), cx);
284 })
285 .ok();
286 }
287 }
288 })
289 .shared();
290
291 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
292 cx.notify();
293 }
294
295 pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
296 self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
297
298 self.blocks.values().for_each(|block| {
299 block.execution_view.update(cx, |execution_view, cx| {
300 match execution_view.status {
301 ExecutionStatus::Finished => {
302 // Do nothing when the output was good
303 }
304 _ => {
305 // All other cases, set the status to errored
306 execution_view.status =
307 ExecutionStatus::KernelErrored(error_message.clone())
308 }
309 }
310 cx.notify();
311 });
312 });
313 }
314
315 fn on_buffer_event(
316 &mut self,
317 buffer: Entity<MultiBuffer>,
318 event: &multi_buffer::Event,
319 cx: &mut Context<Self>,
320 ) {
321 if let multi_buffer::Event::Edited { .. } = event {
322 let snapshot = buffer.read(cx).snapshot(cx);
323
324 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
325
326 self.blocks.retain(|_id, block| {
327 if block.invalidation_anchor.is_valid(&snapshot) {
328 true
329 } else {
330 blocks_to_remove.insert(block.block_id);
331 false
332 }
333 });
334
335 if !blocks_to_remove.is_empty() {
336 self.editor
337 .update(cx, |editor, cx| {
338 editor.remove_blocks(blocks_to_remove, None, cx);
339 })
340 .ok();
341 cx.notify();
342 }
343 }
344 }
345
346 fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
347 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
348 kernel.request_tx().try_send(message).ok();
349 }
350
351 anyhow::Ok(())
352 }
353
354 pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
355 let blocks_to_remove: HashSet<CustomBlockId> =
356 self.blocks.values().map(|block| block.block_id).collect();
357
358 self.editor
359 .update(cx, |editor, cx| {
360 editor.remove_blocks(blocks_to_remove, None, cx);
361 })
362 .ok();
363
364 self.blocks.clear();
365 }
366
367 pub fn execute(
368 &mut self,
369 code: String,
370 anchor_range: Range<Anchor>,
371 next_cell: Option<Anchor>,
372 move_down: bool,
373 window: &mut Window,
374 cx: &mut Context<Self>,
375 ) {
376 let Some(editor) = self.editor.upgrade() else {
377 return;
378 };
379
380 if code.is_empty() {
381 return;
382 }
383
384 let execute_request = ExecuteRequest {
385 code,
386 ..ExecuteRequest::default()
387 };
388
389 let message: JupyterMessage = execute_request.into();
390
391 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
392
393 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
394
395 self.blocks.retain(|_key, block| {
396 if anchor_range.overlaps(&block.code_range, &buffer) {
397 blocks_to_remove.insert(block.block_id);
398 false
399 } else {
400 true
401 }
402 });
403
404 self.editor
405 .update(cx, |editor, cx| {
406 editor.remove_blocks(blocks_to_remove, None, cx);
407 })
408 .ok();
409
410 let status = match &self.kernel {
411 Kernel::Restarting => ExecutionStatus::Restarting,
412 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
413 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
414 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
415 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
416 Kernel::Shutdown => ExecutionStatus::Shutdown,
417 };
418
419 let parent_message_id = message.header.msg_id.clone();
420 let session_view = cx.entity().downgrade();
421 let weak_editor = self.editor.clone();
422
423 let on_close: CloseBlockFn = Arc::new(
424 move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
425 if let Some(session) = session_view.upgrade() {
426 session.update(cx, |session, cx| {
427 session.blocks.remove(&parent_message_id);
428 cx.notify();
429 });
430 }
431
432 if let Some(editor) = weak_editor.upgrade() {
433 editor.update(cx, |editor, cx| {
434 let mut block_ids = HashSet::default();
435 block_ids.insert(block_id);
436 editor.remove_blocks(block_ids, None, cx);
437 });
438 }
439 },
440 );
441
442 let Ok(editor_block) =
443 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
444 else {
445 return;
446 };
447
448 let new_cursor_pos = if let Some(next_cursor) = next_cell {
449 next_cursor
450 } else {
451 editor_block.invalidation_anchor
452 };
453
454 self.blocks
455 .insert(message.header.msg_id.clone(), editor_block);
456
457 match &self.kernel {
458 Kernel::RunningKernel(_) => {
459 self.send(message, cx).ok();
460 }
461 Kernel::StartingKernel(task) => {
462 // Queue up the execution as a task to run after the kernel starts
463 let task = task.clone();
464 let message = message.clone();
465
466 cx.spawn(async move |this, cx| {
467 task.await;
468 this.update(cx, |session, cx| {
469 session.send(message, cx).ok();
470 })
471 .ok();
472 })
473 .detach();
474 }
475 _ => {}
476 }
477
478 if move_down {
479 editor.update(cx, move |editor, cx| {
480 editor.change_selections(
481 SelectionEffects::scroll(Autoscroll::top_relative(8)),
482 window,
483 cx,
484 |selections| {
485 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
486 },
487 );
488 });
489 }
490 }
491
492 pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
493 let parent_message_id = match message.parent_header.as_ref() {
494 Some(header) => &header.msg_id,
495 None => return,
496 };
497
498 match &message.content {
499 JupyterMessageContent::Status(status) => {
500 self.kernel.set_execution_state(&status.execution_state);
501
502 telemetry::event!(
503 "Kernel Status Changed",
504 kernel_language = self.kernel_specification.language(),
505 kernel_status = KernelStatus::from(&self.kernel).to_string(),
506 repl_session_id = cx.entity_id().to_string(),
507 );
508
509 cx.notify();
510 }
511 JupyterMessageContent::KernelInfoReply(reply) => {
512 self.kernel.set_kernel_info(reply);
513 cx.notify();
514 }
515 JupyterMessageContent::UpdateDisplayData(update) => {
516 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
517 display_id
518 } else {
519 return;
520 };
521
522 self.blocks.iter_mut().for_each(|(_, block)| {
523 block.execution_view.update(cx, |execution_view, cx| {
524 execution_view.update_display_data(&update.data, &display_id, window, cx);
525 });
526 });
527 return;
528 }
529 _ => {}
530 }
531
532 if let Some(block) = self.blocks.get_mut(parent_message_id) {
533 block.handle_message(message, window, cx);
534 }
535 }
536
537 pub fn interrupt(&mut self, cx: &mut Context<Self>) {
538 match &mut self.kernel {
539 Kernel::RunningKernel(_kernel) => {
540 self.send(InterruptRequest {}.into(), cx).ok();
541 }
542 Kernel::StartingKernel(_task) => {
543 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
544 }
545 _ => {}
546 }
547 }
548
549 pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
550 if let Kernel::Shutdown = kernel {
551 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
552 }
553
554 let kernel_status = KernelStatus::from(&kernel).to_string();
555 let kernel_language = self.kernel_specification.language();
556
557 telemetry::event!(
558 "Kernel Status Changed",
559 kernel_language,
560 kernel_status,
561 repl_session_id = cx.entity_id().to_string(),
562 );
563
564 self.kernel = kernel;
565 }
566
567 pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
568 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
569
570 match kernel {
571 Kernel::RunningKernel(mut kernel) => {
572 let mut request_tx = kernel.request_tx().clone();
573
574 let forced = kernel.force_shutdown(window, cx);
575
576 cx.spawn(async move |this, cx| {
577 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
578 request_tx.try_send(message).ok();
579
580 forced.await.log_err();
581
582 // Give the kernel a bit of time to clean up
583 cx.background_executor().timer(Duration::from_secs(3)).await;
584
585 this.update(cx, |session, cx| {
586 session.clear_outputs(cx);
587 session.kernel(Kernel::Shutdown, cx);
588 cx.notify();
589 })
590 .ok();
591 })
592 .detach();
593 }
594 _ => {
595 self.kernel(Kernel::Shutdown, cx);
596 }
597 }
598 cx.notify();
599 }
600
601 pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
602 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
603
604 match kernel {
605 Kernel::Restarting => {
606 // Do nothing if already restarting
607 }
608 Kernel::RunningKernel(mut kernel) => {
609 let mut request_tx = kernel.request_tx().clone();
610
611 let forced = kernel.force_shutdown(window, cx);
612
613 cx.spawn_in(window, async move |this, cx| {
614 // Send shutdown request with restart flag
615 log::debug!("restarting kernel");
616 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
617 request_tx.try_send(message).ok();
618
619 // Wait for kernel to shutdown
620 cx.background_executor().timer(Duration::from_secs(1)).await;
621
622 // Force kill the kernel if it hasn't shut down
623 forced.await.log_err();
624
625 // Start a new kernel
626 this.update_in(cx, |session, window, cx| {
627 // TODO: Differentiate between restart and restart+clear-outputs
628 session.clear_outputs(cx);
629 session.start_kernel(window, cx);
630 })
631 .ok();
632 })
633 .detach();
634 }
635 _ => {
636 self.clear_outputs(cx);
637 self.start_kernel(window, cx);
638 }
639 }
640 cx.notify();
641 }
642}
643
644pub enum SessionEvent {
645 Shutdown(WeakEntity<Editor>),
646}
647
648impl EventEmitter<SessionEvent> for Session {}
649
650impl Render for Session {
651 fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
652 let (status_text, interrupt_button) = match &self.kernel {
653 Kernel::RunningKernel(kernel) => (
654 kernel
655 .kernel_info()
656 .as_ref()
657 .map(|info| info.language_info.name.clone()),
658 Some(
659 Button::new("interrupt", "Interrupt")
660 .style(ButtonStyle::Subtle)
661 .on_click(cx.listener(move |session, _, _, cx| {
662 session.interrupt(cx);
663 })),
664 ),
665 ),
666 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
667 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
668 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
669 Kernel::Shutdown => (Some("Shutdown".into()), None),
670 Kernel::Restarting => (Some("Restarting".into()), None),
671 };
672
673 KernelListItem::new(self.kernel_specification.clone())
674 .status_color(match &self.kernel {
675 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
676 ExecutionState::Idle => Color::Success,
677 ExecutionState::Busy => Color::Modified,
678 },
679 Kernel::StartingKernel(_) => Color::Modified,
680 Kernel::ErroredLaunch(_) => Color::Error,
681 Kernel::ShuttingDown => Color::Modified,
682 Kernel::Shutdown => Color::Disabled,
683 Kernel::Restarting => Color::Modified,
684 })
685 .child(Label::new(self.kernel_specification.name()))
686 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
687 .button(
688 Button::new("shutdown", "Shutdown")
689 .style(ButtonStyle::Subtle)
690 .disabled(self.kernel.is_shutting_down())
691 .on_click(cx.listener(move |session, _, window, cx| {
692 session.shutdown(window, cx);
693 })),
694 )
695 .buttons(interrupt_button)
696 }
697}