1use crate::components::KernelListItem;
2use crate::kernels::RemoteRunningKernel;
3use crate::setup_editor_session_actions;
4use crate::{
5 KernelStatus,
6 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
7 outputs::{ExecutionStatus, ExecutionView},
8};
9use anyhow::Context as _;
10use collections::{HashMap, HashSet};
11use editor::SelectionEffects;
12use editor::{
13 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
14 display_map::{
15 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
16 RenderBlock,
17 },
18 scroll::Autoscroll,
19};
20use futures::FutureExt as _;
21use gpui::{
22 Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
23};
24use language::Point;
25use project::Fs;
26use runtimelib::{
27 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
28 ShutdownRequest,
29};
30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
31use theme::ActiveTheme;
32use ui::{Tooltip, prelude::*};
33use util::ResultExt as _;
34
35pub struct Session {
36 fs: Arc<dyn Fs>,
37 editor: WeakEntity<Editor>,
38 pub kernel: Kernel,
39 blocks: HashMap<String, EditorBlock>,
40 pub kernel_specification: KernelSpecification,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: Entity<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakEntity<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut Context<Session>,
61 ) -> anyhow::Result<Self> {
62 let editor = editor.upgrade().context("editor is not open")?;
63 let workspace = editor.read(cx).workspace().context("workspace dropped")?;
64
65 let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
66
67 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
68 let buffer = editor.buffer().clone();
69 let buffer_snapshot = buffer.read(cx).snapshot(cx);
70 let end_point = code_range.end.to_point(&buffer_snapshot);
71 let next_row_start = end_point + Point::new(1, 0);
72 if next_row_start > buffer_snapshot.max_point() {
73 buffer.update(cx, |buffer, cx| {
74 buffer.edit(
75 [(
76 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
77 "\n",
78 )],
79 None,
80 cx,
81 )
82 });
83 }
84
85 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
86 let block = BlockProperties {
87 placement: BlockPlacement::Below(code_range.end),
88 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
89 height: Some(1),
90 style: BlockStyle::Sticky,
91 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
92 priority: 0,
93 };
94
95 let block_id = editor.insert_blocks([block], None, cx)[0];
96 (block_id, invalidation_anchor)
97 });
98
99 anyhow::Ok(Self {
100 code_range,
101 invalidation_anchor,
102 block_id,
103 execution_view,
104 })
105 }
106
107 fn handle_message(
108 &mut self,
109 message: &JupyterMessage,
110 window: &mut Window,
111 cx: &mut Context<Session>,
112 ) {
113 self.execution_view.update(cx, |execution_view, cx| {
114 execution_view.push_message(&message.content, window, cx);
115 });
116 }
117
118 fn create_output_area_renderer(
119 execution_view: Entity<ExecutionView>,
120 on_close: CloseBlockFn,
121 ) -> RenderBlock {
122 Arc::new(move |cx: &mut BlockContext| {
123 let execution_view = execution_view.clone();
124 let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
125
126 let editor_margins = cx.margins;
127 let gutter = editor_margins.gutter;
128
129 let block_id = cx.block_id;
130 let on_close = on_close.clone();
131
132 let rem_size = cx.window.rem_size();
133
134 let text_line_height = text_style.line_height_in_pixels(rem_size);
135
136 let close_button = h_flex()
137 .flex_none()
138 .items_center()
139 .justify_center()
140 .absolute()
141 .top(text_line_height / 2.)
142 .right(
143 // 2px is a magic number to nudge the button just a bit closer to
144 // the line number start
145 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
146 )
147 .w(text_line_height)
148 .h(text_line_height)
149 .child(
150 IconButton::new("close_output_area", IconName::Close)
151 .icon_size(IconSize::Small)
152 .icon_color(Color::Muted)
153 .size(ButtonSize::Compact)
154 .tooltip(Tooltip::text("Close output area"))
155 .on_click(move |_, window, cx| {
156 if let BlockId::Custom(block_id) = block_id {
157 (on_close)(block_id, window, cx)
158 }
159 }),
160 );
161
162 div()
163 .id(cx.block_id)
164 .block_mouse_except_scroll()
165 .flex()
166 .items_start()
167 .min_h(text_line_height)
168 .w_full()
169 .border_y_1()
170 .border_color(cx.theme().colors().border)
171 .bg(cx.theme().colors().background)
172 .child(
173 div()
174 .relative()
175 .w(gutter.full_width())
176 .h(text_line_height * 2)
177 .child(close_button),
178 )
179 .child(
180 div()
181 .flex_1()
182 .size_full()
183 .py(text_line_height / 2.)
184 .mr(editor_margins.right)
185 .pr_2()
186 .child(execution_view),
187 )
188 .into_any_element()
189 })
190 }
191}
192
193impl Session {
194 pub fn new(
195 editor: WeakEntity<Editor>,
196 fs: Arc<dyn Fs>,
197 kernel_specification: KernelSpecification,
198 window: &mut Window,
199 cx: &mut Context<Self>,
200 ) -> Self {
201 let subscription = match editor.upgrade() {
202 Some(editor) => {
203 let buffer = editor.read(cx).buffer().clone();
204 cx.subscribe(&buffer, Self::on_buffer_event)
205 }
206 None => Subscription::new(|| {}),
207 };
208
209 let editor_handle = editor.clone();
210
211 editor
212 .update(cx, |editor, _cx| {
213 setup_editor_session_actions(editor, editor_handle);
214 })
215 .ok();
216
217 let mut session = Self {
218 fs,
219 editor,
220 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
221 blocks: HashMap::default(),
222 kernel_specification,
223 _buffer_subscription: subscription,
224 };
225
226 session.start_kernel(window, cx);
227 session
228 }
229
230 fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
231 let kernel_language = self.kernel_specification.language();
232 let entity_id = self.editor.entity_id();
233 let working_directory = self
234 .editor
235 .upgrade()
236 .and_then(|editor| editor.read(cx).working_directory(cx))
237 .unwrap_or_else(temp_dir);
238
239 telemetry::event!(
240 "Kernel Status Changed",
241 kernel_language,
242 kernel_status = KernelStatus::Starting.to_string(),
243 repl_session_id = cx.entity_id().to_string(),
244 );
245
246 let session_view = cx.entity();
247
248 let kernel = match self.kernel_specification.clone() {
249 KernelSpecification::Jupyter(kernel_specification)
250 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
251 kernel_specification,
252 entity_id,
253 working_directory,
254 self.fs.clone(),
255 session_view,
256 window,
257 cx,
258 ),
259 KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
260 remote_kernel_specification,
261 working_directory,
262 session_view,
263 window,
264 cx,
265 ),
266 };
267
268 let pending_kernel = cx
269 .spawn(async move |this, cx| {
270 let kernel = kernel.await;
271
272 match kernel {
273 Ok(kernel) => {
274 this.update(cx, |session, cx| {
275 session.kernel(Kernel::RunningKernel(kernel), cx);
276 })
277 .ok();
278 }
279 Err(err) => {
280 this.update(cx, |session, cx| {
281 session.kernel_errored(err.to_string(), cx);
282 })
283 .ok();
284 }
285 }
286 })
287 .shared();
288
289 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
290 cx.notify();
291 }
292
293 pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
294 self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
295
296 self.blocks.values().for_each(|block| {
297 block.execution_view.update(cx, |execution_view, cx| {
298 match execution_view.status {
299 ExecutionStatus::Finished => {
300 // Do nothing when the output was good
301 }
302 _ => {
303 // All other cases, set the status to errored
304 execution_view.status =
305 ExecutionStatus::KernelErrored(error_message.clone())
306 }
307 }
308 cx.notify();
309 });
310 });
311 }
312
313 fn on_buffer_event(
314 &mut self,
315 buffer: Entity<MultiBuffer>,
316 event: &multi_buffer::Event,
317 cx: &mut Context<Self>,
318 ) {
319 if let multi_buffer::Event::Edited { .. } = event {
320 let snapshot = buffer.read(cx).snapshot(cx);
321
322 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
323
324 self.blocks.retain(|_id, block| {
325 if block.invalidation_anchor.is_valid(&snapshot) {
326 true
327 } else {
328 blocks_to_remove.insert(block.block_id);
329 false
330 }
331 });
332
333 if !blocks_to_remove.is_empty() {
334 self.editor
335 .update(cx, |editor, cx| {
336 editor.remove_blocks(blocks_to_remove, None, cx);
337 })
338 .ok();
339 cx.notify();
340 }
341 }
342 }
343
344 fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
345 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
346 kernel.request_tx().try_send(message).ok();
347 }
348
349 anyhow::Ok(())
350 }
351
352 pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
353 let blocks_to_remove: HashSet<CustomBlockId> =
354 self.blocks.values().map(|block| block.block_id).collect();
355
356 self.editor
357 .update(cx, |editor, cx| {
358 editor.remove_blocks(blocks_to_remove, None, cx);
359 })
360 .ok();
361
362 self.blocks.clear();
363 }
364
365 pub fn execute(
366 &mut self,
367 code: String,
368 anchor_range: Range<Anchor>,
369 next_cell: Option<Anchor>,
370 move_down: bool,
371 window: &mut Window,
372 cx: &mut Context<Self>,
373 ) {
374 let Some(editor) = self.editor.upgrade() else {
375 return;
376 };
377
378 if code.is_empty() {
379 return;
380 }
381
382 let execute_request = ExecuteRequest {
383 code,
384 ..ExecuteRequest::default()
385 };
386
387 let message: JupyterMessage = execute_request.into();
388
389 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
390
391 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
392
393 self.blocks.retain(|_key, block| {
394 if anchor_range.overlaps(&block.code_range, &buffer) {
395 blocks_to_remove.insert(block.block_id);
396 false
397 } else {
398 true
399 }
400 });
401
402 self.editor
403 .update(cx, |editor, cx| {
404 editor.remove_blocks(blocks_to_remove, None, cx);
405 })
406 .ok();
407
408 let status = match &self.kernel {
409 Kernel::Restarting => ExecutionStatus::Restarting,
410 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
411 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
412 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
413 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
414 Kernel::Shutdown => ExecutionStatus::Shutdown,
415 };
416
417 let parent_message_id = message.header.msg_id.clone();
418 let session_view = cx.entity().downgrade();
419 let weak_editor = self.editor.clone();
420
421 let on_close: CloseBlockFn = Arc::new(
422 move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
423 if let Some(session) = session_view.upgrade() {
424 session.update(cx, |session, cx| {
425 session.blocks.remove(&parent_message_id);
426 cx.notify();
427 });
428 }
429
430 if let Some(editor) = weak_editor.upgrade() {
431 editor.update(cx, |editor, cx| {
432 let mut block_ids = HashSet::default();
433 block_ids.insert(block_id);
434 editor.remove_blocks(block_ids, None, cx);
435 });
436 }
437 },
438 );
439
440 let Ok(editor_block) =
441 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
442 else {
443 return;
444 };
445
446 let new_cursor_pos = if let Some(next_cursor) = next_cell {
447 next_cursor
448 } else {
449 editor_block.invalidation_anchor
450 };
451
452 self.blocks
453 .insert(message.header.msg_id.clone(), editor_block);
454
455 match &self.kernel {
456 Kernel::RunningKernel(_) => {
457 self.send(message, cx).ok();
458 }
459 Kernel::StartingKernel(task) => {
460 // Queue up the execution as a task to run after the kernel starts
461 let task = task.clone();
462
463 cx.spawn(async move |this, cx| {
464 task.await;
465 this.update(cx, |session, cx| {
466 session.send(message, cx).ok();
467 })
468 .ok();
469 })
470 .detach();
471 }
472 _ => {}
473 }
474
475 if move_down {
476 editor.update(cx, move |editor, cx| {
477 editor.change_selections(
478 SelectionEffects::scroll(Autoscroll::top_relative(8)),
479 window,
480 cx,
481 |selections| {
482 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
483 },
484 );
485 });
486 }
487 }
488
489 pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
490 let parent_message_id = match message.parent_header.as_ref() {
491 Some(header) => &header.msg_id,
492 None => return,
493 };
494
495 match &message.content {
496 JupyterMessageContent::Status(status) => {
497 self.kernel.set_execution_state(&status.execution_state);
498
499 telemetry::event!(
500 "Kernel Status Changed",
501 kernel_language = self.kernel_specification.language(),
502 kernel_status = KernelStatus::from(&self.kernel).to_string(),
503 repl_session_id = cx.entity_id().to_string(),
504 );
505
506 cx.notify();
507 }
508 JupyterMessageContent::KernelInfoReply(reply) => {
509 self.kernel.set_kernel_info(reply);
510 cx.notify();
511 }
512 JupyterMessageContent::UpdateDisplayData(update) => {
513 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
514 display_id
515 } else {
516 return;
517 };
518
519 self.blocks.iter_mut().for_each(|(_, block)| {
520 block.execution_view.update(cx, |execution_view, cx| {
521 execution_view.update_display_data(&update.data, &display_id, window, cx);
522 });
523 });
524 return;
525 }
526 _ => {}
527 }
528
529 if let Some(block) = self.blocks.get_mut(parent_message_id) {
530 block.handle_message(message, window, cx);
531 }
532 }
533
534 pub fn interrupt(&mut self, cx: &mut Context<Self>) {
535 match &mut self.kernel {
536 Kernel::RunningKernel(_kernel) => {
537 self.send(InterruptRequest {}.into(), cx).ok();
538 }
539 Kernel::StartingKernel(_task) => {
540 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
541 }
542 _ => {}
543 }
544 }
545
546 pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
547 if let Kernel::Shutdown = kernel {
548 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
549 }
550
551 let kernel_status = KernelStatus::from(&kernel).to_string();
552 let kernel_language = self.kernel_specification.language();
553
554 telemetry::event!(
555 "Kernel Status Changed",
556 kernel_language,
557 kernel_status,
558 repl_session_id = cx.entity_id().to_string(),
559 );
560
561 self.kernel = kernel;
562 }
563
564 pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
565 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
566
567 match kernel {
568 Kernel::RunningKernel(mut kernel) => {
569 let mut request_tx = kernel.request_tx();
570
571 let forced = kernel.force_shutdown(window, cx);
572
573 cx.spawn(async move |this, cx| {
574 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
575 request_tx.try_send(message).ok();
576
577 forced.await.log_err();
578
579 // Give the kernel a bit of time to clean up
580 cx.background_executor().timer(Duration::from_secs(3)).await;
581
582 this.update(cx, |session, cx| {
583 session.clear_outputs(cx);
584 session.kernel(Kernel::Shutdown, cx);
585 cx.notify();
586 })
587 .ok();
588 })
589 .detach();
590 }
591 _ => {
592 self.kernel(Kernel::Shutdown, cx);
593 }
594 }
595 cx.notify();
596 }
597
598 pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
599 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
600
601 match kernel {
602 Kernel::Restarting => {
603 // Do nothing if already restarting
604 }
605 Kernel::RunningKernel(mut kernel) => {
606 let mut request_tx = kernel.request_tx();
607
608 let forced = kernel.force_shutdown(window, cx);
609
610 cx.spawn_in(window, async move |this, cx| {
611 // Send shutdown request with restart flag
612 log::debug!("restarting kernel");
613 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
614 request_tx.try_send(message).ok();
615
616 // Wait for kernel to shutdown
617 cx.background_executor().timer(Duration::from_secs(1)).await;
618
619 // Force kill the kernel if it hasn't shut down
620 forced.await.log_err();
621
622 // Start a new kernel
623 this.update_in(cx, |session, window, cx| {
624 // TODO: Differentiate between restart and restart+clear-outputs
625 session.clear_outputs(cx);
626 session.start_kernel(window, cx);
627 })
628 .ok();
629 })
630 .detach();
631 }
632 _ => {
633 self.clear_outputs(cx);
634 self.start_kernel(window, cx);
635 }
636 }
637 cx.notify();
638 }
639}
640
641pub enum SessionEvent {
642 Shutdown(WeakEntity<Editor>),
643}
644
645impl EventEmitter<SessionEvent> for Session {}
646
647impl Render for Session {
648 fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
649 let (status_text, interrupt_button) = match &self.kernel {
650 Kernel::RunningKernel(kernel) => (
651 kernel
652 .kernel_info()
653 .as_ref()
654 .map(|info| info.language_info.name.clone()),
655 Some(
656 Button::new("interrupt", "Interrupt")
657 .style(ButtonStyle::Subtle)
658 .on_click(cx.listener(move |session, _, _, cx| {
659 session.interrupt(cx);
660 })),
661 ),
662 ),
663 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
664 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
665 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
666 Kernel::Shutdown => (Some("Shutdown".into()), None),
667 Kernel::Restarting => (Some("Restarting".into()), None),
668 };
669
670 KernelListItem::new(self.kernel_specification.clone())
671 .status_color(match &self.kernel {
672 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
673 ExecutionState::Idle => Color::Success,
674 ExecutionState::Busy => Color::Modified,
675 },
676 Kernel::StartingKernel(_) => Color::Modified,
677 Kernel::ErroredLaunch(_) => Color::Error,
678 Kernel::ShuttingDown => Color::Modified,
679 Kernel::Shutdown => Color::Disabled,
680 Kernel::Restarting => Color::Modified,
681 })
682 .child(Label::new(self.kernel_specification.name()))
683 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
684 .button(
685 Button::new("shutdown", "Shutdown")
686 .style(ButtonStyle::Subtle)
687 .disabled(self.kernel.is_shutting_down())
688 .on_click(cx.listener(move |session, _, window, cx| {
689 session.shutdown(window, cx);
690 })),
691 )
692 .buttons(interrupt_button)
693 }
694}