1use crate::components::KernelListItem;
2use crate::kernels::RemoteRunningKernel;
3use crate::setup_editor_session_actions;
4use crate::{
5 KernelStatus,
6 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
7 outputs::{ExecutionStatus, ExecutionView},
8};
9use collections::{HashMap, HashSet};
10use editor::{
11 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
12 display_map::{
13 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
14 RenderBlock,
15 },
16 scroll::Autoscroll,
17};
18use futures::FutureExt as _;
19use gpui::{
20 Context, Entity, EventEmitter, Render, Subscription, Task, WeakEntity, Window, div, prelude::*,
21};
22use language::Point;
23use project::Fs;
24use runtimelib::{
25 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
26 ShutdownRequest,
27};
28use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
29use theme::ActiveTheme;
30use ui::{IconButtonShape, Tooltip, prelude::*};
31use util::ResultExt as _;
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakEntity<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 pub kernel_specification: KernelSpecification,
39 _buffer_subscription: Subscription,
40}
41
42struct EditorBlock {
43 code_range: Range<Anchor>,
44 invalidation_anchor: Anchor,
45 block_id: CustomBlockId,
46 execution_view: Entity<ExecutionView>,
47}
48
49type CloseBlockFn =
50 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut Window, &mut App) + Send + Sync + 'static>;
51
52impl EditorBlock {
53 fn new(
54 editor: WeakEntity<Editor>,
55 code_range: Range<Anchor>,
56 status: ExecutionStatus,
57 on_close: CloseBlockFn,
58 cx: &mut Context<Session>,
59 ) -> anyhow::Result<Self> {
60 let editor = editor
61 .upgrade()
62 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
63 let workspace = editor
64 .read(cx)
65 .workspace()
66 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
67
68 let execution_view = cx.new(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
69
70 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
71 let buffer = editor.buffer().clone();
72 let buffer_snapshot = buffer.read(cx).snapshot(cx);
73 let end_point = code_range.end.to_point(&buffer_snapshot);
74 let next_row_start = end_point + Point::new(1, 0);
75 if next_row_start > buffer_snapshot.max_point() {
76 buffer.update(cx, |buffer, cx| {
77 buffer.edit(
78 [(
79 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
80 "\n",
81 )],
82 None,
83 cx,
84 )
85 });
86 }
87
88 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
89 let block = BlockProperties {
90 placement: BlockPlacement::Below(code_range.end),
91 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
92 height: Some(1),
93 style: BlockStyle::Sticky,
94 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
95 priority: 0,
96 };
97
98 let block_id = editor.insert_blocks([block], None, cx)[0];
99 (block_id, invalidation_anchor)
100 });
101
102 anyhow::Ok(Self {
103 code_range,
104 invalidation_anchor,
105 block_id,
106 execution_view,
107 })
108 }
109
110 fn handle_message(
111 &mut self,
112 message: &JupyterMessage,
113 window: &mut Window,
114 cx: &mut Context<Session>,
115 ) {
116 self.execution_view.update(cx, |execution_view, cx| {
117 execution_view.push_message(&message.content, window, cx);
118 });
119 }
120
121 fn create_output_area_renderer(
122 execution_view: Entity<ExecutionView>,
123 on_close: CloseBlockFn,
124 ) -> RenderBlock {
125 Arc::new(move |cx: &mut BlockContext| {
126 let execution_view = execution_view.clone();
127 let text_style = crate::outputs::plain::text_style(cx.window, cx.app);
128
129 let gutter = cx.gutter_dimensions;
130
131 let block_id = cx.block_id;
132 let on_close = on_close.clone();
133
134 let rem_size = cx.window.rem_size();
135
136 let text_line_height = text_style.line_height_in_pixels(rem_size);
137
138 let close_button = h_flex()
139 .flex_none()
140 .items_center()
141 .justify_center()
142 .absolute()
143 .top(text_line_height / 2.)
144 .right(
145 // 2px is a magic number to nudge the button just a bit closer to
146 // the line number start
147 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
148 )
149 .w(text_line_height)
150 .h(text_line_height)
151 .child(
152 IconButton::new("close_output_area", IconName::Close)
153 .icon_size(IconSize::Small)
154 .icon_color(Color::Muted)
155 .size(ButtonSize::Compact)
156 .shape(IconButtonShape::Square)
157 .tooltip(Tooltip::text("Close output area"))
158 .on_click(move |_, window, cx| {
159 if let BlockId::Custom(block_id) = block_id {
160 (on_close)(block_id, window, cx)
161 }
162 }),
163 );
164
165 div()
166 .id(cx.block_id)
167 .block_mouse_down()
168 .flex()
169 .items_start()
170 .min_h(text_line_height)
171 .w_full()
172 .border_y_1()
173 .border_color(cx.theme().colors().border)
174 .bg(cx.theme().colors().background)
175 .child(
176 div()
177 .relative()
178 .w(gutter.full_width())
179 .h(text_line_height * 2)
180 .child(close_button),
181 )
182 .child(
183 div()
184 .flex_1()
185 .size_full()
186 .py(text_line_height / 2.)
187 .mr(gutter.width)
188 .child(execution_view),
189 )
190 .into_any_element()
191 })
192 }
193}
194
195impl Session {
196 pub fn new(
197 editor: WeakEntity<Editor>,
198 fs: Arc<dyn Fs>,
199 kernel_specification: KernelSpecification,
200 window: &mut Window,
201 cx: &mut Context<Self>,
202 ) -> Self {
203 let subscription = match editor.upgrade() {
204 Some(editor) => {
205 let buffer = editor.read(cx).buffer().clone();
206 cx.subscribe(&buffer, Self::on_buffer_event)
207 }
208 None => Subscription::new(|| {}),
209 };
210
211 let editor_handle = editor.clone();
212
213 editor
214 .update(cx, |editor, _cx| {
215 setup_editor_session_actions(editor, editor_handle);
216 })
217 .ok();
218
219 let mut session = Self {
220 fs,
221 editor,
222 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
223 blocks: HashMap::default(),
224 kernel_specification,
225 _buffer_subscription: subscription,
226 };
227
228 session.start_kernel(window, cx);
229 session
230 }
231
232 fn start_kernel(&mut self, window: &mut Window, cx: &mut Context<Self>) {
233 let kernel_language = self.kernel_specification.language();
234 let entity_id = self.editor.entity_id();
235 let working_directory = self
236 .editor
237 .upgrade()
238 .and_then(|editor| editor.read(cx).working_directory(cx))
239 .unwrap_or_else(temp_dir);
240
241 telemetry::event!(
242 "Kernel Status Changed",
243 kernel_language,
244 kernel_status = KernelStatus::Starting.to_string(),
245 repl_session_id = cx.entity_id().to_string(),
246 );
247
248 let session_view = cx.entity().clone();
249
250 let kernel = match self.kernel_specification.clone() {
251 KernelSpecification::Jupyter(kernel_specification)
252 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
253 kernel_specification,
254 entity_id,
255 working_directory,
256 self.fs.clone(),
257 session_view,
258 window,
259 cx,
260 ),
261 KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
262 remote_kernel_specification,
263 working_directory,
264 session_view,
265 window,
266 cx,
267 ),
268 };
269
270 let pending_kernel = cx
271 .spawn(async move |this, cx| {
272 let kernel = kernel.await;
273
274 match kernel {
275 Ok(kernel) => {
276 this.update(cx, |session, cx| {
277 session.kernel(Kernel::RunningKernel(kernel), cx);
278 })
279 .ok();
280 }
281 Err(err) => {
282 this.update(cx, |session, cx| {
283 session.kernel_errored(err.to_string(), cx);
284 })
285 .ok();
286 }
287 }
288 })
289 .shared();
290
291 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
292 cx.notify();
293 }
294
295 pub fn kernel_errored(&mut self, error_message: String, cx: &mut Context<Self>) {
296 self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
297
298 self.blocks.values().for_each(|block| {
299 block.execution_view.update(cx, |execution_view, cx| {
300 match execution_view.status {
301 ExecutionStatus::Finished => {
302 // Do nothing when the output was good
303 }
304 _ => {
305 // All other cases, set the status to errored
306 execution_view.status =
307 ExecutionStatus::KernelErrored(error_message.clone())
308 }
309 }
310 cx.notify();
311 });
312 });
313 }
314
315 fn on_buffer_event(
316 &mut self,
317 buffer: Entity<MultiBuffer>,
318 event: &multi_buffer::Event,
319 cx: &mut Context<Self>,
320 ) {
321 if let multi_buffer::Event::Edited { .. } = event {
322 let snapshot = buffer.read(cx).snapshot(cx);
323
324 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
325
326 self.blocks.retain(|_id, block| {
327 if block.invalidation_anchor.is_valid(&snapshot) {
328 true
329 } else {
330 blocks_to_remove.insert(block.block_id);
331 false
332 }
333 });
334
335 if !blocks_to_remove.is_empty() {
336 self.editor
337 .update(cx, |editor, cx| {
338 editor.remove_blocks(blocks_to_remove, None, cx);
339 })
340 .ok();
341 cx.notify();
342 }
343 }
344 }
345
346 fn send(&mut self, message: JupyterMessage, _cx: &mut Context<Self>) -> anyhow::Result<()> {
347 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
348 kernel.request_tx().try_send(message).ok();
349 }
350
351 anyhow::Ok(())
352 }
353
354 pub fn clear_outputs(&mut self, cx: &mut Context<Self>) {
355 let blocks_to_remove: HashSet<CustomBlockId> =
356 self.blocks.values().map(|block| block.block_id).collect();
357
358 self.editor
359 .update(cx, |editor, cx| {
360 editor.remove_blocks(blocks_to_remove, None, cx);
361 })
362 .ok();
363
364 self.blocks.clear();
365 }
366
367 pub fn execute(
368 &mut self,
369 code: String,
370 anchor_range: Range<Anchor>,
371 next_cell: Option<Anchor>,
372 move_down: bool,
373 window: &mut Window,
374 cx: &mut Context<Self>,
375 ) {
376 let Some(editor) = self.editor.upgrade() else {
377 return;
378 };
379
380 if code.is_empty() {
381 return;
382 }
383
384 let execute_request = ExecuteRequest {
385 code,
386 ..ExecuteRequest::default()
387 };
388
389 let message: JupyterMessage = execute_request.into();
390
391 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
392
393 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
394
395 self.blocks.retain(|_key, block| {
396 if anchor_range.overlaps(&block.code_range, &buffer) {
397 blocks_to_remove.insert(block.block_id);
398 false
399 } else {
400 true
401 }
402 });
403
404 self.editor
405 .update(cx, |editor, cx| {
406 editor.remove_blocks(blocks_to_remove, None, cx);
407 })
408 .ok();
409
410 let status = match &self.kernel {
411 Kernel::Restarting => ExecutionStatus::Restarting,
412 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
413 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
414 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
415 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
416 Kernel::Shutdown => ExecutionStatus::Shutdown,
417 };
418
419 let parent_message_id = message.header.msg_id.clone();
420 let session_view = cx.entity().downgrade();
421 let weak_editor = self.editor.clone();
422
423 let on_close: CloseBlockFn = Arc::new(
424 move |block_id: CustomBlockId, _: &mut Window, cx: &mut App| {
425 if let Some(session) = session_view.upgrade() {
426 session.update(cx, |session, cx| {
427 session.blocks.remove(&parent_message_id);
428 cx.notify();
429 });
430 }
431
432 if let Some(editor) = weak_editor.upgrade() {
433 editor.update(cx, |editor, cx| {
434 let mut block_ids = HashSet::default();
435 block_ids.insert(block_id);
436 editor.remove_blocks(block_ids, None, cx);
437 });
438 }
439 },
440 );
441
442 let Ok(editor_block) =
443 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
444 else {
445 return;
446 };
447
448 let new_cursor_pos = if let Some(next_cursor) = next_cell {
449 next_cursor
450 } else {
451 editor_block.invalidation_anchor
452 };
453
454 self.blocks
455 .insert(message.header.msg_id.clone(), editor_block);
456
457 match &self.kernel {
458 Kernel::RunningKernel(_) => {
459 self.send(message, cx).ok();
460 }
461 Kernel::StartingKernel(task) => {
462 // Queue up the execution as a task to run after the kernel starts
463 let task = task.clone();
464 let message = message.clone();
465
466 cx.spawn(async move |this, cx| {
467 task.await;
468 this.update(cx, |session, cx| {
469 session.send(message, cx).ok();
470 })
471 .ok();
472 })
473 .detach();
474 }
475 _ => {}
476 }
477
478 if move_down {
479 editor.update(cx, move |editor, cx| {
480 editor.change_selections(
481 Some(Autoscroll::top_relative(8)),
482 window,
483 cx,
484 |selections| {
485 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
486 },
487 );
488 });
489 }
490 }
491
492 pub fn route(&mut self, message: &JupyterMessage, window: &mut Window, cx: &mut Context<Self>) {
493 let parent_message_id = match message.parent_header.as_ref() {
494 Some(header) => &header.msg_id,
495 None => return,
496 };
497
498 match &message.content {
499 JupyterMessageContent::Status(status) => {
500 self.kernel.set_execution_state(&status.execution_state);
501
502 telemetry::event!(
503 "Kernel Status Changed",
504 kernel_language = self.kernel_specification.language(),
505 kernel_status = KernelStatus::from(&self.kernel).to_string(),
506 repl_session_id = cx.entity_id().to_string(),
507 );
508
509 cx.notify();
510 }
511 JupyterMessageContent::KernelInfoReply(reply) => {
512 self.kernel.set_kernel_info(reply);
513 cx.notify();
514 }
515 JupyterMessageContent::UpdateDisplayData(update) => {
516 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
517 display_id
518 } else {
519 return;
520 };
521
522 self.blocks.iter_mut().for_each(|(_, block)| {
523 block.execution_view.update(cx, |execution_view, cx| {
524 execution_view.update_display_data(&update.data, &display_id, window, cx);
525 });
526 });
527 return;
528 }
529 _ => {}
530 }
531
532 if let Some(block) = self.blocks.get_mut(parent_message_id) {
533 block.handle_message(message, window, cx);
534 }
535 }
536
537 pub fn interrupt(&mut self, cx: &mut Context<Self>) {
538 match &mut self.kernel {
539 Kernel::RunningKernel(_kernel) => {
540 self.send(InterruptRequest {}.into(), cx).ok();
541 }
542 Kernel::StartingKernel(_task) => {
543 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
544 }
545 _ => {}
546 }
547 }
548
549 pub fn kernel(&mut self, kernel: Kernel, cx: &mut Context<Self>) {
550 if let Kernel::Shutdown = kernel {
551 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
552 }
553
554 let kernel_status = KernelStatus::from(&kernel).to_string();
555 let kernel_language = self.kernel_specification.language();
556
557 telemetry::event!(
558 "Kernel Status Changed",
559 kernel_language,
560 kernel_status,
561 repl_session_id = cx.entity_id().to_string(),
562 );
563
564 self.kernel = kernel;
565 }
566
567 pub fn shutdown(&mut self, window: &mut Window, cx: &mut Context<Self>) {
568 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
569
570 match kernel {
571 Kernel::RunningKernel(mut kernel) => {
572 let mut request_tx = kernel.request_tx().clone();
573
574 let forced = kernel.force_shutdown(window, cx);
575
576 cx.spawn(async move |this, cx| {
577 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
578 request_tx.try_send(message).ok();
579
580 forced.await.log_err();
581
582 // Give the kernel a bit of time to clean up
583 cx.background_executor().timer(Duration::from_secs(3)).await;
584
585 this.update(cx, |session, cx| {
586 session.clear_outputs(cx);
587 session.kernel(Kernel::Shutdown, cx);
588 cx.notify();
589 })
590 .ok();
591 })
592 .detach();
593 }
594 _ => {
595 self.kernel(Kernel::Shutdown, cx);
596 }
597 }
598 cx.notify();
599 }
600
601 pub fn restart(&mut self, window: &mut Window, cx: &mut Context<Self>) {
602 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
603
604 match kernel {
605 Kernel::Restarting => {
606 // Do nothing if already restarting
607 }
608 Kernel::RunningKernel(mut kernel) => {
609 let mut request_tx = kernel.request_tx().clone();
610
611 let forced = kernel.force_shutdown(window, cx);
612
613 cx.spawn_in(window, async move |this, cx| {
614 // Send shutdown request with restart flag
615 log::debug!("restarting kernel");
616 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
617 request_tx.try_send(message).ok();
618
619 // Wait for kernel to shutdown
620 cx.background_executor().timer(Duration::from_secs(1)).await;
621
622 // Force kill the kernel if it hasn't shut down
623 forced.await.log_err();
624
625 // Start a new kernel
626 this.update_in(cx, |session, window, cx| {
627 // TODO: Differentiate between restart and restart+clear-outputs
628 session.clear_outputs(cx);
629 session.start_kernel(window, cx);
630 })
631 .ok();
632 })
633 .detach();
634 }
635 _ => {
636 self.clear_outputs(cx);
637 self.start_kernel(window, cx);
638 }
639 }
640 cx.notify();
641 }
642}
643
644pub enum SessionEvent {
645 Shutdown(WeakEntity<Editor>),
646}
647
648impl EventEmitter<SessionEvent> for Session {}
649
650impl Render for Session {
651 fn render(&mut self, _: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
652 let (status_text, interrupt_button) = match &self.kernel {
653 Kernel::RunningKernel(kernel) => (
654 kernel
655 .kernel_info()
656 .as_ref()
657 .map(|info| info.language_info.name.clone()),
658 Some(
659 Button::new("interrupt", "Interrupt")
660 .style(ButtonStyle::Subtle)
661 .on_click(cx.listener(move |session, _, _, cx| {
662 session.interrupt(cx);
663 })),
664 ),
665 ),
666 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
667 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
668 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
669 Kernel::Shutdown => (Some("Shutdown".into()), None),
670 Kernel::Restarting => (Some("Restarting".into()), None),
671 };
672
673 KernelListItem::new(self.kernel_specification.clone())
674 .status_color(match &self.kernel {
675 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
676 ExecutionState::Idle => Color::Success,
677 ExecutionState::Busy => Color::Modified,
678 },
679 Kernel::StartingKernel(_) => Color::Modified,
680 Kernel::ErroredLaunch(_) => Color::Error,
681 Kernel::ShuttingDown => Color::Modified,
682 Kernel::Shutdown => Color::Disabled,
683 Kernel::Restarting => Color::Modified,
684 })
685 .child(Label::new(self.kernel_specification.name()))
686 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
687 .button(
688 Button::new("shutdown", "Shutdown")
689 .style(ButtonStyle::Subtle)
690 .disabled(self.kernel.is_shutting_down())
691 .on_click(cx.listener(move |session, _, window, cx| {
692 session.shutdown(window, cx);
693 })),
694 )
695 .buttons(interrupt_button)
696 }
697}