1use crate::components::KernelListItem;
2use crate::kernels::RemoteRunningKernel;
3use crate::setup_editor_session_actions;
4use crate::{
5 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
6 outputs::{ExecutionStatus, ExecutionView},
7 KernelStatus,
8};
9use collections::{HashMap, HashSet};
10use editor::{
11 display_map::{
12 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
13 RenderBlock,
14 },
15 scroll::Autoscroll,
16 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
17};
18use futures::FutureExt as _;
19use gpui::{
20 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
21};
22use language::Point;
23use project::Fs;
24use runtimelib::{
25 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
26 ShutdownRequest,
27};
28use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
29use theme::ActiveTheme;
30use ui::{prelude::*, IconButtonShape, Tooltip};
31use util::ResultExt as _;
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 pub kernel_specification: KernelSpecification,
39 _buffer_subscription: Subscription,
40}
41
42struct EditorBlock {
43 code_range: Range<Anchor>,
44 invalidation_anchor: Anchor,
45 block_id: CustomBlockId,
46 execution_view: View<ExecutionView>,
47}
48
49type CloseBlockFn =
50 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
51
52impl EditorBlock {
53 fn new(
54 editor: WeakView<Editor>,
55 code_range: Range<Anchor>,
56 status: ExecutionStatus,
57 on_close: CloseBlockFn,
58 cx: &mut ViewContext<Session>,
59 ) -> anyhow::Result<Self> {
60 let editor = editor
61 .upgrade()
62 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
63 let workspace = editor
64 .read(cx)
65 .workspace()
66 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
67
68 let execution_view =
69 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
70
71 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
72 let buffer = editor.buffer().clone();
73 let buffer_snapshot = buffer.read(cx).snapshot(cx);
74 let end_point = code_range.end.to_point(&buffer_snapshot);
75 let next_row_start = end_point + Point::new(1, 0);
76 if next_row_start > buffer_snapshot.max_point() {
77 buffer.update(cx, |buffer, cx| {
78 buffer.edit(
79 [(
80 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
81 "\n",
82 )],
83 None,
84 cx,
85 )
86 });
87 }
88
89 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
90 let block = BlockProperties {
91 placement: BlockPlacement::Below(code_range.end),
92 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
93 height: 1,
94 style: BlockStyle::Sticky,
95 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
96 priority: 0,
97 };
98
99 let block_id = editor.insert_blocks([block], None, cx)[0];
100 (block_id, invalidation_anchor)
101 });
102
103 anyhow::Ok(Self {
104 code_range,
105 invalidation_anchor,
106 block_id,
107 execution_view,
108 })
109 }
110
111 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
112 self.execution_view.update(cx, |execution_view, cx| {
113 execution_view.push_message(&message.content, cx);
114 });
115 }
116
117 fn create_output_area_renderer(
118 execution_view: View<ExecutionView>,
119 on_close: CloseBlockFn,
120 ) -> RenderBlock {
121 Arc::new(move |cx: &mut BlockContext| {
122 let execution_view = execution_view.clone();
123 let text_style = crate::outputs::plain::text_style(cx);
124
125 let gutter = cx.gutter_dimensions;
126
127 let block_id = cx.block_id;
128 let on_close = on_close.clone();
129
130 let rem_size = cx.rem_size();
131
132 let text_line_height = text_style.line_height_in_pixels(rem_size);
133
134 let close_button = h_flex()
135 .flex_none()
136 .items_center()
137 .justify_center()
138 .absolute()
139 .top(text_line_height / 2.)
140 .right(
141 // 2px is a magic number to nudge the button just a bit closer to
142 // the line number start
143 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
144 )
145 .w(text_line_height)
146 .h(text_line_height)
147 .child(
148 IconButton::new("close_output_area", IconName::Close)
149 .icon_size(IconSize::Small)
150 .icon_color(Color::Muted)
151 .size(ButtonSize::Compact)
152 .shape(IconButtonShape::Square)
153 .tooltip(|cx| Tooltip::text("Close output area", cx))
154 .on_click(move |_, cx| {
155 if let BlockId::Custom(block_id) = block_id {
156 (on_close)(block_id, cx)
157 }
158 }),
159 );
160
161 div()
162 .id(cx.block_id)
163 .block_mouse_down()
164 .flex()
165 .items_start()
166 .min_h(text_line_height)
167 .w_full()
168 .border_y_1()
169 .border_color(cx.theme().colors().border)
170 .bg(cx.theme().colors().background)
171 .child(
172 div()
173 .relative()
174 .w(gutter.full_width())
175 .h(text_line_height * 2)
176 .child(close_button),
177 )
178 .child(
179 div()
180 .flex_1()
181 .size_full()
182 .py(text_line_height / 2.)
183 .mr(gutter.width)
184 .child(execution_view),
185 )
186 .into_any_element()
187 })
188 }
189}
190
191impl Session {
192 pub fn new(
193 editor: WeakView<Editor>,
194 fs: Arc<dyn Fs>,
195 kernel_specification: KernelSpecification,
196 cx: &mut ViewContext<Self>,
197 ) -> Self {
198 let subscription = match editor.upgrade() {
199 Some(editor) => {
200 let buffer = editor.read(cx).buffer().clone();
201 cx.subscribe(&buffer, Self::on_buffer_event)
202 }
203 None => Subscription::new(|| {}),
204 };
205
206 let editor_handle = editor.clone();
207
208 editor
209 .update(cx, |editor, _cx| {
210 setup_editor_session_actions(editor, editor_handle);
211 })
212 .ok();
213
214 let mut session = Self {
215 fs,
216 editor,
217 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
218 blocks: HashMap::default(),
219 kernel_specification,
220 _buffer_subscription: subscription,
221 };
222
223 session.start_kernel(cx);
224 session
225 }
226
227 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
228 let kernel_language = self.kernel_specification.language();
229 let entity_id = self.editor.entity_id();
230 let working_directory = self
231 .editor
232 .upgrade()
233 .and_then(|editor| editor.read(cx).working_directory(cx))
234 .unwrap_or_else(temp_dir);
235
236 telemetry::event!(
237 "Kernel Status Changed",
238 kernel_language,
239 kernel_status = KernelStatus::Starting.to_string(),
240 repl_session_id = cx.entity_id().to_string(),
241 );
242
243 let session_view = cx.view().clone();
244
245 let kernel = match self.kernel_specification.clone() {
246 KernelSpecification::Jupyter(kernel_specification)
247 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
248 kernel_specification,
249 entity_id,
250 working_directory,
251 self.fs.clone(),
252 session_view,
253 cx,
254 ),
255 KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
256 remote_kernel_specification,
257 working_directory,
258 session_view,
259 cx,
260 ),
261 };
262
263 let pending_kernel = cx
264 .spawn(|this, mut cx| async move {
265 let kernel = kernel.await;
266
267 match kernel {
268 Ok(kernel) => {
269 this.update(&mut cx, |session, cx| {
270 session.kernel(Kernel::RunningKernel(kernel), cx);
271 })
272 .ok();
273 }
274 Err(err) => {
275 this.update(&mut cx, |session, cx| {
276 session.kernel_errored(err.to_string(), cx);
277 })
278 .ok();
279 }
280 }
281 })
282 .shared();
283
284 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
285 cx.notify();
286 }
287
288 pub fn kernel_errored(&mut self, error_message: String, cx: &mut ViewContext<Self>) {
289 self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
290
291 self.blocks.values().for_each(|block| {
292 block.execution_view.update(cx, |execution_view, cx| {
293 match execution_view.status {
294 ExecutionStatus::Finished => {
295 // Do nothing when the output was good
296 }
297 _ => {
298 // All other cases, set the status to errored
299 execution_view.status =
300 ExecutionStatus::KernelErrored(error_message.clone())
301 }
302 }
303 cx.notify();
304 });
305 });
306 }
307
308 fn on_buffer_event(
309 &mut self,
310 buffer: Model<MultiBuffer>,
311 event: &multi_buffer::Event,
312 cx: &mut ViewContext<Self>,
313 ) {
314 if let multi_buffer::Event::Edited { .. } = event {
315 let snapshot = buffer.read(cx).snapshot(cx);
316
317 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
318
319 self.blocks.retain(|_id, block| {
320 if block.invalidation_anchor.is_valid(&snapshot) {
321 true
322 } else {
323 blocks_to_remove.insert(block.block_id);
324 false
325 }
326 });
327
328 if !blocks_to_remove.is_empty() {
329 self.editor
330 .update(cx, |editor, cx| {
331 editor.remove_blocks(blocks_to_remove, None, cx);
332 })
333 .ok();
334 cx.notify();
335 }
336 }
337 }
338
339 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
340 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
341 kernel.request_tx().try_send(message).ok();
342 }
343
344 anyhow::Ok(())
345 }
346
347 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
348 let blocks_to_remove: HashSet<CustomBlockId> =
349 self.blocks.values().map(|block| block.block_id).collect();
350
351 self.editor
352 .update(cx, |editor, cx| {
353 editor.remove_blocks(blocks_to_remove, None, cx);
354 })
355 .ok();
356
357 self.blocks.clear();
358 }
359
360 pub fn execute(
361 &mut self,
362 code: String,
363 anchor_range: Range<Anchor>,
364 next_cell: Option<Anchor>,
365 move_down: bool,
366 cx: &mut ViewContext<Self>,
367 ) {
368 let Some(editor) = self.editor.upgrade() else {
369 return;
370 };
371
372 if code.is_empty() {
373 return;
374 }
375
376 let execute_request = ExecuteRequest {
377 code,
378 ..ExecuteRequest::default()
379 };
380
381 let message: JupyterMessage = execute_request.into();
382
383 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
384
385 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
386
387 self.blocks.retain(|_key, block| {
388 if anchor_range.overlaps(&block.code_range, &buffer) {
389 blocks_to_remove.insert(block.block_id);
390 false
391 } else {
392 true
393 }
394 });
395
396 self.editor
397 .update(cx, |editor, cx| {
398 editor.remove_blocks(blocks_to_remove, None, cx);
399 })
400 .ok();
401
402 let status = match &self.kernel {
403 Kernel::Restarting => ExecutionStatus::Restarting,
404 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
405 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
406 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
407 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
408 Kernel::Shutdown => ExecutionStatus::Shutdown,
409 };
410
411 let parent_message_id = message.header.msg_id.clone();
412 let session_view = cx.view().downgrade();
413 let weak_editor = self.editor.clone();
414
415 let on_close: CloseBlockFn =
416 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
417 if let Some(session) = session_view.upgrade() {
418 session.update(cx, |session, cx| {
419 session.blocks.remove(&parent_message_id);
420 cx.notify();
421 });
422 }
423
424 if let Some(editor) = weak_editor.upgrade() {
425 editor.update(cx, |editor, cx| {
426 let mut block_ids = HashSet::default();
427 block_ids.insert(block_id);
428 editor.remove_blocks(block_ids, None, cx);
429 });
430 }
431 });
432
433 let Ok(editor_block) =
434 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
435 else {
436 return;
437 };
438
439 let new_cursor_pos = if let Some(next_cursor) = next_cell {
440 next_cursor
441 } else {
442 editor_block.invalidation_anchor
443 };
444
445 self.blocks
446 .insert(message.header.msg_id.clone(), editor_block);
447
448 match &self.kernel {
449 Kernel::RunningKernel(_) => {
450 self.send(message, cx).ok();
451 }
452 Kernel::StartingKernel(task) => {
453 // Queue up the execution as a task to run after the kernel starts
454 let task = task.clone();
455 let message = message.clone();
456
457 cx.spawn(|this, mut cx| async move {
458 task.await;
459 this.update(&mut cx, |session, cx| {
460 session.send(message, cx).ok();
461 })
462 .ok();
463 })
464 .detach();
465 }
466 _ => {}
467 }
468
469 if move_down {
470 editor.update(cx, move |editor, cx| {
471 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
472 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
473 });
474 });
475 }
476 }
477
478 pub fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
479 let parent_message_id = match message.parent_header.as_ref() {
480 Some(header) => &header.msg_id,
481 None => return,
482 };
483
484 match &message.content {
485 JupyterMessageContent::Status(status) => {
486 self.kernel.set_execution_state(&status.execution_state);
487
488 telemetry::event!(
489 "Kernel Status Changed",
490 kernel_language = self.kernel_specification.language(),
491 kernel_status = KernelStatus::from(&self.kernel).to_string(),
492 repl_session_id = cx.entity_id().to_string(),
493 );
494
495 cx.notify();
496 }
497 JupyterMessageContent::KernelInfoReply(reply) => {
498 self.kernel.set_kernel_info(reply);
499 cx.notify();
500 }
501 JupyterMessageContent::UpdateDisplayData(update) => {
502 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
503 display_id
504 } else {
505 return;
506 };
507
508 self.blocks.iter_mut().for_each(|(_, block)| {
509 block.execution_view.update(cx, |execution_view, cx| {
510 execution_view.update_display_data(&update.data, &display_id, cx);
511 });
512 });
513 return;
514 }
515 _ => {}
516 }
517
518 if let Some(block) = self.blocks.get_mut(parent_message_id) {
519 block.handle_message(message, cx);
520 }
521 }
522
523 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
524 match &mut self.kernel {
525 Kernel::RunningKernel(_kernel) => {
526 self.send(InterruptRequest {}.into(), cx).ok();
527 }
528 Kernel::StartingKernel(_task) => {
529 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
530 }
531 _ => {}
532 }
533 }
534
535 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
536 if let Kernel::Shutdown = kernel {
537 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
538 }
539
540 let kernel_status = KernelStatus::from(&kernel).to_string();
541 let kernel_language = self.kernel_specification.language();
542
543 telemetry::event!(
544 "Kernel Status Changed",
545 kernel_language,
546 kernel_status,
547 repl_session_id = cx.entity_id().to_string(),
548 );
549
550 self.kernel = kernel;
551 }
552
553 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
554 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
555
556 match kernel {
557 Kernel::RunningKernel(mut kernel) => {
558 let mut request_tx = kernel.request_tx().clone();
559
560 let forced = kernel.force_shutdown(cx);
561
562 cx.spawn(|this, mut cx| async move {
563 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
564 request_tx.try_send(message).ok();
565
566 forced.await.log_err();
567
568 // Give the kernel a bit of time to clean up
569 cx.background_executor().timer(Duration::from_secs(3)).await;
570
571 this.update(&mut cx, |session, cx| {
572 session.clear_outputs(cx);
573 session.kernel(Kernel::Shutdown, cx);
574 cx.notify();
575 })
576 .ok();
577 })
578 .detach();
579 }
580 _ => {
581 self.kernel(Kernel::Shutdown, cx);
582 }
583 }
584 cx.notify();
585 }
586
587 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
588 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
589
590 match kernel {
591 Kernel::Restarting => {
592 // Do nothing if already restarting
593 }
594 Kernel::RunningKernel(mut kernel) => {
595 let mut request_tx = kernel.request_tx().clone();
596
597 let forced = kernel.force_shutdown(cx);
598
599 cx.spawn(|this, mut cx| async move {
600 // Send shutdown request with restart flag
601 log::debug!("restarting kernel");
602 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
603 request_tx.try_send(message).ok();
604
605 // Wait for kernel to shutdown
606 cx.background_executor().timer(Duration::from_secs(1)).await;
607
608 // Force kill the kernel if it hasn't shut down
609 forced.await.log_err();
610
611 // Start a new kernel
612 this.update(&mut cx, |session, cx| {
613 // TODO: Differentiate between restart and restart+clear-outputs
614 session.clear_outputs(cx);
615 session.start_kernel(cx);
616 })
617 .ok();
618 })
619 .detach();
620 }
621 _ => {
622 self.clear_outputs(cx);
623 self.start_kernel(cx);
624 }
625 }
626 cx.notify();
627 }
628}
629
630pub enum SessionEvent {
631 Shutdown(WeakView<Editor>),
632}
633
634impl EventEmitter<SessionEvent> for Session {}
635
636impl Render for Session {
637 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
638 let (status_text, interrupt_button) = match &self.kernel {
639 Kernel::RunningKernel(kernel) => (
640 kernel
641 .kernel_info()
642 .as_ref()
643 .map(|info| info.language_info.name.clone()),
644 Some(
645 Button::new("interrupt", "Interrupt")
646 .style(ButtonStyle::Subtle)
647 .on_click(cx.listener(move |session, _, cx| {
648 session.interrupt(cx);
649 })),
650 ),
651 ),
652 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
653 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
654 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
655 Kernel::Shutdown => (Some("Shutdown".into()), None),
656 Kernel::Restarting => (Some("Restarting".into()), None),
657 };
658
659 KernelListItem::new(self.kernel_specification.clone())
660 .status_color(match &self.kernel {
661 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
662 ExecutionState::Idle => Color::Success,
663 ExecutionState::Busy => Color::Modified,
664 },
665 Kernel::StartingKernel(_) => Color::Modified,
666 Kernel::ErroredLaunch(_) => Color::Error,
667 Kernel::ShuttingDown => Color::Modified,
668 Kernel::Shutdown => Color::Disabled,
669 Kernel::Restarting => Color::Modified,
670 })
671 .child(Label::new(self.kernel_specification.name()))
672 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
673 .button(
674 Button::new("shutdown", "Shutdown")
675 .style(ButtonStyle::Subtle)
676 .disabled(self.kernel.is_shutting_down())
677 .on_click(cx.listener(move |session, _, cx| {
678 session.shutdown(cx);
679 })),
680 )
681 .buttons(interrupt_button)
682 }
683}