1use crate::components::KernelListItem;
2use crate::setup_editor_session_actions;
3use crate::{
4 kernels::{Kernel, KernelSpecification, NativeRunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6 KernelStatus,
7};
8use client::telemetry::Telemetry;
9use collections::{HashMap, HashSet};
10use editor::{
11 display_map::{
12 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
13 RenderBlock,
14 },
15 scroll::Autoscroll,
16 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
17};
18use futures::io::BufReader;
19use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
20use gpui::{
21 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::ActiveTheme;
31use ui::{prelude::*, IconButtonShape, Tooltip};
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Option<Task<()>>,
39 process_status_task: Option<Task<()>>,
40 pub kernel_specification: KernelSpecification,
41 telemetry: Arc<Telemetry>,
42 _buffer_subscription: Subscription,
43}
44
45struct EditorBlock {
46 code_range: Range<Anchor>,
47 invalidation_anchor: Anchor,
48 block_id: CustomBlockId,
49 execution_view: View<ExecutionView>,
50}
51
52type CloseBlockFn =
53 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
54
55impl EditorBlock {
56 fn new(
57 editor: WeakView<Editor>,
58 code_range: Range<Anchor>,
59 status: ExecutionStatus,
60 on_close: CloseBlockFn,
61 cx: &mut ViewContext<Session>,
62 ) -> anyhow::Result<Self> {
63 let editor = editor
64 .upgrade()
65 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
66 let workspace = editor
67 .read(cx)
68 .workspace()
69 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
70
71 let execution_view =
72 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
73
74 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
75 let buffer = editor.buffer().clone();
76 let buffer_snapshot = buffer.read(cx).snapshot(cx);
77 let end_point = code_range.end.to_point(&buffer_snapshot);
78 let next_row_start = end_point + Point::new(1, 0);
79 if next_row_start > buffer_snapshot.max_point() {
80 buffer.update(cx, |buffer, cx| {
81 buffer.edit(
82 [(
83 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
84 "\n",
85 )],
86 None,
87 cx,
88 )
89 });
90 }
91
92 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
93 let block = BlockProperties {
94 placement: BlockPlacement::Below(code_range.end),
95 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
96 height: 1,
97 style: BlockStyle::Sticky,
98 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
99 priority: 0,
100 };
101
102 let block_id = editor.insert_blocks([block], None, cx)[0];
103 (block_id, invalidation_anchor)
104 });
105
106 anyhow::Ok(Self {
107 code_range,
108 invalidation_anchor,
109 block_id,
110 execution_view,
111 })
112 }
113
114 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
115 self.execution_view.update(cx, |execution_view, cx| {
116 execution_view.push_message(&message.content, cx);
117 });
118 }
119
120 fn create_output_area_renderer(
121 execution_view: View<ExecutionView>,
122 on_close: CloseBlockFn,
123 ) -> RenderBlock {
124 Arc::new(move |cx: &mut BlockContext| {
125 let execution_view = execution_view.clone();
126 let text_style = crate::outputs::plain::text_style(cx);
127
128 let gutter = cx.gutter_dimensions;
129
130 let block_id = cx.block_id;
131 let on_close = on_close.clone();
132
133 let rem_size = cx.rem_size();
134
135 let text_line_height = text_style.line_height_in_pixels(rem_size);
136
137 let close_button = h_flex()
138 .flex_none()
139 .items_center()
140 .justify_center()
141 .absolute()
142 .top(text_line_height / 2.)
143 .right(
144 // 2px is a magic number to nudge the button just a bit closer to
145 // the line number start
146 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
147 )
148 .w(text_line_height)
149 .h(text_line_height)
150 .child(
151 IconButton::new("close_output_area", IconName::Close)
152 .icon_size(IconSize::Small)
153 .icon_color(Color::Muted)
154 .size(ButtonSize::Compact)
155 .shape(IconButtonShape::Square)
156 .tooltip(|cx| Tooltip::text("Close output area", cx))
157 .on_click(move |_, cx| {
158 if let BlockId::Custom(block_id) = block_id {
159 (on_close)(block_id, cx)
160 }
161 }),
162 );
163
164 div()
165 .id(cx.block_id)
166 .block_mouse_down()
167 .flex()
168 .items_start()
169 .min_h(text_line_height)
170 .w_full()
171 .border_y_1()
172 .border_color(cx.theme().colors().border)
173 .bg(cx.theme().colors().background)
174 .child(
175 div()
176 .relative()
177 .w(gutter.full_width())
178 .h(text_line_height * 2)
179 .child(close_button),
180 )
181 .child(
182 div()
183 .flex_1()
184 .size_full()
185 .py(text_line_height / 2.)
186 .mr(gutter.width)
187 .child(execution_view),
188 )
189 .into_any_element()
190 })
191 }
192}
193
194impl Session {
195 pub fn new(
196 editor: WeakView<Editor>,
197 fs: Arc<dyn Fs>,
198 telemetry: Arc<Telemetry>,
199 kernel_specification: KernelSpecification,
200 cx: &mut ViewContext<Self>,
201 ) -> Self {
202 let subscription = match editor.upgrade() {
203 Some(editor) => {
204 let buffer = editor.read(cx).buffer().clone();
205 cx.subscribe(&buffer, Self::on_buffer_event)
206 }
207 None => Subscription::new(|| {}),
208 };
209
210 let editor_handle = editor.clone();
211
212 editor
213 .update(cx, |editor, _cx| {
214 setup_editor_session_actions(editor, editor_handle);
215 })
216 .ok();
217
218 let mut session = Self {
219 fs,
220 editor,
221 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
222 messaging_task: None,
223 process_status_task: None,
224 blocks: HashMap::default(),
225 kernel_specification,
226 _buffer_subscription: subscription,
227 telemetry,
228 };
229
230 session.start_kernel(cx);
231 session
232 }
233
234 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
235 let kernel_language = self.kernel_specification.language();
236 let entity_id = self.editor.entity_id();
237 let working_directory = self
238 .editor
239 .upgrade()
240 .and_then(|editor| editor.read(cx).working_directory(cx))
241 .unwrap_or_else(temp_dir);
242
243 self.telemetry.report_repl_event(
244 kernel_language.into(),
245 KernelStatus::Starting.to_string(),
246 cx.entity_id().to_string(),
247 );
248
249 let kernel = match self.kernel_specification.clone() {
250 KernelSpecification::Jupyter(kernel_specification)
251 | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
252 kernel_specification,
253 entity_id,
254 working_directory,
255 self.fs.clone(),
256 cx,
257 ),
258 KernelSpecification::Remote(_remote_kernel_specification) => {
259 unimplemented!()
260 }
261 };
262
263 let pending_kernel = cx
264 .spawn(|this, mut cx| async move {
265 let kernel = kernel.await;
266
267 match kernel {
268 Ok((mut kernel, mut messages_rx)) => {
269 this.update(&mut cx, |session, cx| {
270 let stderr = kernel.process.stderr.take();
271
272 cx.spawn(|_session, mut _cx| async move {
273 if stderr.is_none() {
274 return;
275 }
276 let reader = BufReader::new(stderr.unwrap());
277 let mut lines = reader.lines();
278 while let Some(Ok(line)) = lines.next().await {
279 // todo!(): Log stdout and stderr to something the session can show
280 log::error!("kernel: {}", line);
281 }
282 })
283 .detach();
284
285 let stdout = kernel.process.stdout.take();
286
287 cx.spawn(|_session, mut _cx| async move {
288 if stdout.is_none() {
289 return;
290 }
291 let reader = BufReader::new(stdout.unwrap());
292 let mut lines = reader.lines();
293 while let Some(Ok(line)) = lines.next().await {
294 log::info!("kernel: {}", line);
295 }
296 })
297 .detach();
298
299 let status = kernel.process.status();
300 session.kernel(Kernel::RunningKernel(Box::new(kernel)), cx);
301
302 let process_status_task = cx.spawn(|session, mut cx| async move {
303 let error_message = match status.await {
304 Ok(status) => {
305 if status.success() {
306 log::info!("kernel process exited successfully");
307 return;
308 }
309
310 format!("kernel process exited with status: {:?}", status)
311 }
312 Err(err) => {
313 format!("kernel process exited with error: {:?}", err)
314 }
315 };
316
317 log::error!("{}", error_message);
318
319 session
320 .update(&mut cx, |session, cx| {
321 session.kernel(
322 Kernel::ErroredLaunch(error_message.clone()),
323 cx,
324 );
325
326 session.blocks.values().for_each(|block| {
327 block.execution_view.update(
328 cx,
329 |execution_view, cx| {
330 match execution_view.status {
331 ExecutionStatus::Finished => {
332 // Do nothing when the output was good
333 }
334 _ => {
335 // All other cases, set the status to errored
336 execution_view.status =
337 ExecutionStatus::KernelErrored(
338 error_message.clone(),
339 )
340 }
341 }
342 cx.notify();
343 },
344 );
345 });
346
347 cx.notify();
348 })
349 .ok();
350 });
351
352 session.process_status_task = Some(process_status_task);
353
354 session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
355 while let Some(message) = messages_rx.next().await {
356 session
357 .update(&mut cx, |session, cx| {
358 session.route(&message, cx);
359 })
360 .ok();
361 }
362 }));
363
364 // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
365 // cx.spawn(|this, mut cx| async move {
366 // cx.background_executor()
367 // .timer(Duration::from_millis(120))
368 // .await;
369 // this.update(&mut cx, |this, cx| {
370 // this.send(KernelInfoRequest {}.into(), cx).ok();
371 // })
372 // .ok();
373 // })
374 // .detach();
375 })
376 .ok();
377 }
378 Err(err) => {
379 this.update(&mut cx, |session, cx| {
380 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
381 })
382 .ok();
383 }
384 }
385 })
386 .shared();
387
388 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
389 cx.notify();
390 }
391
392 fn on_buffer_event(
393 &mut self,
394 buffer: Model<MultiBuffer>,
395 event: &multi_buffer::Event,
396 cx: &mut ViewContext<Self>,
397 ) {
398 if let multi_buffer::Event::Edited { .. } = event {
399 let snapshot = buffer.read(cx).snapshot(cx);
400
401 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
402
403 self.blocks.retain(|_id, block| {
404 if block.invalidation_anchor.is_valid(&snapshot) {
405 true
406 } else {
407 blocks_to_remove.insert(block.block_id);
408 false
409 }
410 });
411
412 if !blocks_to_remove.is_empty() {
413 self.editor
414 .update(cx, |editor, cx| {
415 editor.remove_blocks(blocks_to_remove, None, cx);
416 })
417 .ok();
418 cx.notify();
419 }
420 }
421 }
422
423 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
424 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
425 kernel.request_tx().try_send(message).ok();
426 }
427
428 anyhow::Ok(())
429 }
430
431 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
432 let blocks_to_remove: HashSet<CustomBlockId> =
433 self.blocks.values().map(|block| block.block_id).collect();
434
435 self.editor
436 .update(cx, |editor, cx| {
437 editor.remove_blocks(blocks_to_remove, None, cx);
438 })
439 .ok();
440
441 self.blocks.clear();
442 }
443
444 pub fn execute(
445 &mut self,
446 code: String,
447 anchor_range: Range<Anchor>,
448 next_cell: Option<Anchor>,
449 move_down: bool,
450 cx: &mut ViewContext<Self>,
451 ) {
452 let Some(editor) = self.editor.upgrade() else {
453 return;
454 };
455
456 if code.is_empty() {
457 return;
458 }
459
460 let execute_request = ExecuteRequest {
461 code,
462 ..ExecuteRequest::default()
463 };
464
465 let message: JupyterMessage = execute_request.into();
466
467 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
468
469 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
470
471 self.blocks.retain(|_key, block| {
472 if anchor_range.overlaps(&block.code_range, &buffer) {
473 blocks_to_remove.insert(block.block_id);
474 false
475 } else {
476 true
477 }
478 });
479
480 self.editor
481 .update(cx, |editor, cx| {
482 editor.remove_blocks(blocks_to_remove, None, cx);
483 })
484 .ok();
485
486 let status = match &self.kernel {
487 Kernel::Restarting => ExecutionStatus::Restarting,
488 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
489 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
490 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
491 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
492 Kernel::Shutdown => ExecutionStatus::Shutdown,
493 };
494
495 let parent_message_id = message.header.msg_id.clone();
496 let session_view = cx.view().downgrade();
497 let weak_editor = self.editor.clone();
498
499 let on_close: CloseBlockFn =
500 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
501 if let Some(session) = session_view.upgrade() {
502 session.update(cx, |session, cx| {
503 session.blocks.remove(&parent_message_id);
504 cx.notify();
505 });
506 }
507
508 if let Some(editor) = weak_editor.upgrade() {
509 editor.update(cx, |editor, cx| {
510 let mut block_ids = HashSet::default();
511 block_ids.insert(block_id);
512 editor.remove_blocks(block_ids, None, cx);
513 });
514 }
515 });
516
517 let Ok(editor_block) =
518 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
519 else {
520 return;
521 };
522
523 let new_cursor_pos = if let Some(next_cursor) = next_cell {
524 next_cursor
525 } else {
526 editor_block.invalidation_anchor
527 };
528
529 self.blocks
530 .insert(message.header.msg_id.clone(), editor_block);
531
532 match &self.kernel {
533 Kernel::RunningKernel(_) => {
534 self.send(message, cx).ok();
535 }
536 Kernel::StartingKernel(task) => {
537 // Queue up the execution as a task to run after the kernel starts
538 let task = task.clone();
539 let message = message.clone();
540
541 cx.spawn(|this, mut cx| async move {
542 task.await;
543 this.update(&mut cx, |session, cx| {
544 session.send(message, cx).ok();
545 })
546 .ok();
547 })
548 .detach();
549 }
550 _ => {}
551 }
552
553 if move_down {
554 editor.update(cx, move |editor, cx| {
555 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
556 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
557 });
558 });
559 }
560 }
561
562 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
563 let parent_message_id = match message.parent_header.as_ref() {
564 Some(header) => &header.msg_id,
565 None => return,
566 };
567
568 match &message.content {
569 JupyterMessageContent::Status(status) => {
570 self.kernel.set_execution_state(&status.execution_state);
571
572 self.telemetry.report_repl_event(
573 self.kernel_specification.language().into(),
574 KernelStatus::from(&self.kernel).to_string(),
575 cx.entity_id().to_string(),
576 );
577
578 cx.notify();
579 }
580 JupyterMessageContent::KernelInfoReply(reply) => {
581 self.kernel.set_kernel_info(reply);
582 cx.notify();
583 }
584 JupyterMessageContent::UpdateDisplayData(update) => {
585 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
586 display_id
587 } else {
588 return;
589 };
590
591 self.blocks.iter_mut().for_each(|(_, block)| {
592 block.execution_view.update(cx, |execution_view, cx| {
593 execution_view.update_display_data(&update.data, &display_id, cx);
594 });
595 });
596 return;
597 }
598 _ => {}
599 }
600
601 if let Some(block) = self.blocks.get_mut(parent_message_id) {
602 block.handle_message(message, cx);
603 }
604 }
605
606 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
607 match &mut self.kernel {
608 Kernel::RunningKernel(_kernel) => {
609 self.send(InterruptRequest {}.into(), cx).ok();
610 }
611 Kernel::StartingKernel(_task) => {
612 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
613 }
614 _ => {}
615 }
616 }
617
618 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
619 if let Kernel::Shutdown = kernel {
620 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
621 }
622
623 let kernel_status = KernelStatus::from(&kernel).to_string();
624 let kernel_language = self.kernel_specification.language().into();
625
626 self.telemetry.report_repl_event(
627 kernel_language,
628 kernel_status,
629 cx.entity_id().to_string(),
630 );
631
632 self.kernel = kernel;
633 }
634
635 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
636 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
637
638 match kernel {
639 Kernel::RunningKernel(mut kernel) => {
640 let mut request_tx = kernel.request_tx().clone();
641
642 cx.spawn(|this, mut cx| async move {
643 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
644 request_tx.try_send(message).ok();
645
646 // Give the kernel a bit of time to clean up
647 cx.background_executor().timer(Duration::from_secs(3)).await;
648
649 this.update(&mut cx, |session, _cx| {
650 session.messaging_task.take();
651 session.process_status_task.take();
652 })
653 .ok();
654
655 kernel.force_shutdown().ok();
656
657 this.update(&mut cx, |session, cx| {
658 session.clear_outputs(cx);
659 session.kernel(Kernel::Shutdown, cx);
660 cx.notify();
661 })
662 .ok();
663 })
664 .detach();
665 }
666 _ => {
667 self.messaging_task.take();
668 self.process_status_task.take();
669 self.kernel(Kernel::Shutdown, cx);
670 }
671 }
672 cx.notify();
673 }
674
675 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
676 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
677
678 match kernel {
679 Kernel::Restarting => {
680 // Do nothing if already restarting
681 }
682 Kernel::RunningKernel(mut kernel) => {
683 let mut request_tx = kernel.request_tx().clone();
684
685 cx.spawn(|this, mut cx| async move {
686 // Send shutdown request with restart flag
687 log::debug!("restarting kernel");
688 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
689 request_tx.try_send(message).ok();
690
691 this.update(&mut cx, |session, _cx| {
692 session.messaging_task.take();
693 session.process_status_task.take();
694 })
695 .ok();
696
697 // Wait for kernel to shutdown
698 cx.background_executor().timer(Duration::from_secs(1)).await;
699
700 // Force kill the kernel if it hasn't shut down
701 kernel.force_shutdown().ok();
702
703 // Start a new kernel
704 this.update(&mut cx, |session, cx| {
705 // todo!(): Differentiate between restart and restart+clear-outputs
706 session.clear_outputs(cx);
707 session.start_kernel(cx);
708 })
709 .ok();
710 })
711 .detach();
712 }
713 _ => {
714 // If it's not already running, we can just clean up and start a new kernel
715 self.messaging_task.take();
716 self.process_status_task.take();
717 self.clear_outputs(cx);
718 self.start_kernel(cx);
719 }
720 }
721 cx.notify();
722 }
723}
724
725pub enum SessionEvent {
726 Shutdown(WeakView<Editor>),
727}
728
729impl EventEmitter<SessionEvent> for Session {}
730
731impl Render for Session {
732 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
733 let (status_text, interrupt_button) = match &self.kernel {
734 Kernel::RunningKernel(kernel) => (
735 kernel
736 .kernel_info()
737 .as_ref()
738 .map(|info| info.language_info.name.clone()),
739 Some(
740 Button::new("interrupt", "Interrupt")
741 .style(ButtonStyle::Subtle)
742 .on_click(cx.listener(move |session, _, cx| {
743 session.interrupt(cx);
744 })),
745 ),
746 ),
747 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
748 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
749 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
750 Kernel::Shutdown => (Some("Shutdown".into()), None),
751 Kernel::Restarting => (Some("Restarting".into()), None),
752 };
753
754 KernelListItem::new(self.kernel_specification.clone())
755 .status_color(match &self.kernel {
756 Kernel::RunningKernel(kernel) => match kernel.execution_state() {
757 ExecutionState::Idle => Color::Success,
758 ExecutionState::Busy => Color::Modified,
759 },
760 Kernel::StartingKernel(_) => Color::Modified,
761 Kernel::ErroredLaunch(_) => Color::Error,
762 Kernel::ShuttingDown => Color::Modified,
763 Kernel::Shutdown => Color::Disabled,
764 Kernel::Restarting => Color::Modified,
765 })
766 .child(Label::new(self.kernel_specification.name()))
767 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
768 .button(
769 Button::new("shutdown", "Shutdown")
770 .style(ButtonStyle::Subtle)
771 .disabled(self.kernel.is_shutting_down())
772 .on_click(cx.listener(move |session, _, cx| {
773 session.shutdown(cx);
774 })),
775 )
776 .buttons(interrupt_button)
777 }
778}