1use crate::components::KernelListItem;
2use crate::setup_editor_session_actions;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6 KernelStatus,
7};
8use client::telemetry::Telemetry;
9use collections::{HashMap, HashSet};
10use editor::{
11 display_map::{
12 BlockContext, BlockId, BlockPlacement, BlockProperties, BlockStyle, CustomBlockId,
13 RenderBlock,
14 },
15 scroll::Autoscroll,
16 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
17};
18use futures::io::BufReader;
19use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
20use gpui::{
21 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::ActiveTheme;
31use ui::{prelude::*, IconButtonShape, Tooltip};
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Option<Task<()>>,
39 process_status_task: Option<Task<()>>,
40 pub kernel_specification: KernelSpecification,
41 telemetry: Arc<Telemetry>,
42 _buffer_subscription: Subscription,
43}
44
45struct EditorBlock {
46 code_range: Range<Anchor>,
47 invalidation_anchor: Anchor,
48 block_id: CustomBlockId,
49 execution_view: View<ExecutionView>,
50}
51
52type CloseBlockFn =
53 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
54
55impl EditorBlock {
56 fn new(
57 editor: WeakView<Editor>,
58 code_range: Range<Anchor>,
59 status: ExecutionStatus,
60 on_close: CloseBlockFn,
61 cx: &mut ViewContext<Session>,
62 ) -> anyhow::Result<Self> {
63 let editor = editor
64 .upgrade()
65 .ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
66 let workspace = editor
67 .read(cx)
68 .workspace()
69 .ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
70
71 let execution_view =
72 cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
73
74 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
75 let buffer = editor.buffer().clone();
76 let buffer_snapshot = buffer.read(cx).snapshot(cx);
77 let end_point = code_range.end.to_point(&buffer_snapshot);
78 let next_row_start = end_point + Point::new(1, 0);
79 if next_row_start > buffer_snapshot.max_point() {
80 buffer.update(cx, |buffer, cx| {
81 buffer.edit(
82 [(
83 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
84 "\n",
85 )],
86 None,
87 cx,
88 )
89 });
90 }
91
92 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
93 let block = BlockProperties {
94 placement: BlockPlacement::Below(code_range.end),
95 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
96 height: 1,
97 style: BlockStyle::Sticky,
98 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
99 priority: 0,
100 };
101
102 let block_id = editor.insert_blocks([block], None, cx)[0];
103 (block_id, invalidation_anchor)
104 });
105
106 anyhow::Ok(Self {
107 code_range,
108 invalidation_anchor,
109 block_id,
110 execution_view,
111 })
112 }
113
114 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
115 self.execution_view.update(cx, |execution_view, cx| {
116 execution_view.push_message(&message.content, cx);
117 });
118 }
119
120 fn create_output_area_renderer(
121 execution_view: View<ExecutionView>,
122 on_close: CloseBlockFn,
123 ) -> RenderBlock {
124 let render = move |cx: &mut BlockContext| {
125 let execution_view = execution_view.clone();
126 let text_style = crate::outputs::plain::text_style(cx);
127
128 let gutter = cx.gutter_dimensions;
129
130 let block_id = cx.block_id;
131 let on_close = on_close.clone();
132
133 let rem_size = cx.rem_size();
134
135 let text_line_height = text_style.line_height_in_pixels(rem_size);
136
137 let close_button = h_flex()
138 .flex_none()
139 .items_center()
140 .justify_center()
141 .absolute()
142 .top(text_line_height / 2.)
143 .right(
144 // 2px is a magic number to nudge the button just a bit closer to
145 // the line number start
146 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
147 )
148 .w(text_line_height)
149 .h(text_line_height)
150 .child(
151 IconButton::new("close_output_area", IconName::Close)
152 .icon_size(IconSize::Small)
153 .icon_color(Color::Muted)
154 .size(ButtonSize::Compact)
155 .shape(IconButtonShape::Square)
156 .tooltip(|cx| Tooltip::text("Close output area", cx))
157 .on_click(move |_, cx| {
158 if let BlockId::Custom(block_id) = block_id {
159 (on_close)(block_id, cx)
160 }
161 }),
162 );
163
164 div()
165 .id(cx.block_id)
166 .flex()
167 .items_start()
168 .min_h(text_line_height)
169 .w_full()
170 .border_y_1()
171 .border_color(cx.theme().colors().border)
172 .bg(cx.theme().colors().background)
173 .child(
174 div()
175 .relative()
176 .w(gutter.full_width())
177 .h(text_line_height * 2)
178 .child(close_button),
179 )
180 .child(
181 div()
182 .flex_1()
183 .size_full()
184 .py(text_line_height / 2.)
185 .mr(gutter.width)
186 .child(execution_view),
187 )
188 .into_any_element()
189 };
190
191 Box::new(render)
192 }
193}
194
195impl Session {
196 pub fn new(
197 editor: WeakView<Editor>,
198 fs: Arc<dyn Fs>,
199 telemetry: Arc<Telemetry>,
200 kernel_specification: KernelSpecification,
201 cx: &mut ViewContext<Self>,
202 ) -> Self {
203 let subscription = match editor.upgrade() {
204 Some(editor) => {
205 let buffer = editor.read(cx).buffer().clone();
206 cx.subscribe(&buffer, Self::on_buffer_event)
207 }
208 None => Subscription::new(|| {}),
209 };
210
211 let editor_handle = editor.clone();
212
213 editor
214 .update(cx, |editor, _cx| {
215 setup_editor_session_actions(editor, editor_handle);
216 })
217 .ok();
218
219 let mut session = Self {
220 fs,
221 editor,
222 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
223 messaging_task: None,
224 process_status_task: None,
225 blocks: HashMap::default(),
226 kernel_specification,
227 _buffer_subscription: subscription,
228 telemetry,
229 };
230
231 session.start_kernel(cx);
232 session
233 }
234
235 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
236 let kernel_language = self.kernel_specification.language();
237 let entity_id = self.editor.entity_id();
238 let working_directory = self
239 .editor
240 .upgrade()
241 .and_then(|editor| editor.read(cx).working_directory(cx))
242 .unwrap_or_else(temp_dir);
243
244 self.telemetry.report_repl_event(
245 kernel_language.into(),
246 KernelStatus::Starting.to_string(),
247 cx.entity_id().to_string(),
248 );
249
250 let kernel = RunningKernel::new(
251 self.kernel_specification.clone(),
252 entity_id,
253 working_directory,
254 self.fs.clone(),
255 cx,
256 );
257
258 let pending_kernel = cx
259 .spawn(|this, mut cx| async move {
260 let kernel = kernel.await;
261
262 match kernel {
263 Ok((mut kernel, mut messages_rx)) => {
264 this.update(&mut cx, |session, cx| {
265 let stderr = kernel.process.stderr.take();
266
267 cx.spawn(|_session, mut _cx| async move {
268 if stderr.is_none() {
269 return;
270 }
271 let reader = BufReader::new(stderr.unwrap());
272 let mut lines = reader.lines();
273 while let Some(Ok(line)) = lines.next().await {
274 // todo!(): Log stdout and stderr to something the session can show
275 log::error!("kernel: {}", line);
276 }
277 })
278 .detach();
279
280 let stdout = kernel.process.stdout.take();
281
282 cx.spawn(|_session, mut _cx| async move {
283 if stdout.is_none() {
284 return;
285 }
286 let reader = BufReader::new(stdout.unwrap());
287 let mut lines = reader.lines();
288 while let Some(Ok(line)) = lines.next().await {
289 log::info!("kernel: {}", line);
290 }
291 })
292 .detach();
293
294 let status = kernel.process.status();
295 session.kernel(Kernel::RunningKernel(kernel), cx);
296
297 let process_status_task = cx.spawn(|session, mut cx| async move {
298 let error_message = match status.await {
299 Ok(status) => {
300 if status.success() {
301 log::info!("kernel process exited successfully");
302 return;
303 }
304
305 format!("kernel process exited with status: {:?}", status)
306 }
307 Err(err) => {
308 format!("kernel process exited with error: {:?}", err)
309 }
310 };
311
312 log::error!("{}", error_message);
313
314 session
315 .update(&mut cx, |session, cx| {
316 session.kernel(
317 Kernel::ErroredLaunch(error_message.clone()),
318 cx,
319 );
320
321 session.blocks.values().for_each(|block| {
322 block.execution_view.update(
323 cx,
324 |execution_view, cx| {
325 match execution_view.status {
326 ExecutionStatus::Finished => {
327 // Do nothing when the output was good
328 }
329 _ => {
330 // All other cases, set the status to errored
331 execution_view.status =
332 ExecutionStatus::KernelErrored(
333 error_message.clone(),
334 )
335 }
336 }
337 cx.notify();
338 },
339 );
340 });
341
342 cx.notify();
343 })
344 .ok();
345 });
346
347 session.process_status_task = Some(process_status_task);
348
349 session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
350 while let Some(message) = messages_rx.next().await {
351 session
352 .update(&mut cx, |session, cx| {
353 session.route(&message, cx);
354 })
355 .ok();
356 }
357 }));
358
359 // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
360 // cx.spawn(|this, mut cx| async move {
361 // cx.background_executor()
362 // .timer(Duration::from_millis(120))
363 // .await;
364 // this.update(&mut cx, |this, cx| {
365 // this.send(KernelInfoRequest {}.into(), cx).ok();
366 // })
367 // .ok();
368 // })
369 // .detach();
370 })
371 .ok();
372 }
373 Err(err) => {
374 this.update(&mut cx, |session, cx| {
375 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
376 })
377 .ok();
378 }
379 }
380 })
381 .shared();
382
383 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
384 cx.notify();
385 }
386
387 fn on_buffer_event(
388 &mut self,
389 buffer: Model<MultiBuffer>,
390 event: &multi_buffer::Event,
391 cx: &mut ViewContext<Self>,
392 ) {
393 if let multi_buffer::Event::Edited { .. } = event {
394 let snapshot = buffer.read(cx).snapshot(cx);
395
396 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
397
398 self.blocks.retain(|_id, block| {
399 if block.invalidation_anchor.is_valid(&snapshot) {
400 true
401 } else {
402 blocks_to_remove.insert(block.block_id);
403 false
404 }
405 });
406
407 if !blocks_to_remove.is_empty() {
408 self.editor
409 .update(cx, |editor, cx| {
410 editor.remove_blocks(blocks_to_remove, None, cx);
411 })
412 .ok();
413 cx.notify();
414 }
415 }
416 }
417
418 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
419 if let Kernel::RunningKernel(kernel) = &mut self.kernel {
420 kernel.request_tx.try_send(message).ok();
421 }
422
423 anyhow::Ok(())
424 }
425
426 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
427 let blocks_to_remove: HashSet<CustomBlockId> =
428 self.blocks.values().map(|block| block.block_id).collect();
429
430 self.editor
431 .update(cx, |editor, cx| {
432 editor.remove_blocks(blocks_to_remove, None, cx);
433 })
434 .ok();
435
436 self.blocks.clear();
437 }
438
439 pub fn execute(
440 &mut self,
441 code: String,
442 anchor_range: Range<Anchor>,
443 next_cell: Option<Anchor>,
444 move_down: bool,
445 cx: &mut ViewContext<Self>,
446 ) {
447 let Some(editor) = self.editor.upgrade() else {
448 return;
449 };
450
451 if code.is_empty() {
452 return;
453 }
454
455 let execute_request = ExecuteRequest {
456 code,
457 ..ExecuteRequest::default()
458 };
459
460 let message: JupyterMessage = execute_request.into();
461
462 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
463
464 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
465
466 self.blocks.retain(|_key, block| {
467 if anchor_range.overlaps(&block.code_range, &buffer) {
468 blocks_to_remove.insert(block.block_id);
469 false
470 } else {
471 true
472 }
473 });
474
475 self.editor
476 .update(cx, |editor, cx| {
477 editor.remove_blocks(blocks_to_remove, None, cx);
478 })
479 .ok();
480
481 let status = match &self.kernel {
482 Kernel::Restarting => ExecutionStatus::Restarting,
483 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
484 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
485 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
486 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
487 Kernel::Shutdown => ExecutionStatus::Shutdown,
488 };
489
490 let parent_message_id = message.header.msg_id.clone();
491 let session_view = cx.view().downgrade();
492 let weak_editor = self.editor.clone();
493
494 let on_close: CloseBlockFn =
495 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
496 if let Some(session) = session_view.upgrade() {
497 session.update(cx, |session, cx| {
498 session.blocks.remove(&parent_message_id);
499 cx.notify();
500 });
501 }
502
503 if let Some(editor) = weak_editor.upgrade() {
504 editor.update(cx, |editor, cx| {
505 let mut block_ids = HashSet::default();
506 block_ids.insert(block_id);
507 editor.remove_blocks(block_ids, None, cx);
508 });
509 }
510 });
511
512 let Ok(editor_block) =
513 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
514 else {
515 return;
516 };
517
518 let new_cursor_pos = if let Some(next_cursor) = next_cell {
519 next_cursor
520 } else {
521 editor_block.invalidation_anchor
522 };
523
524 self.blocks
525 .insert(message.header.msg_id.clone(), editor_block);
526
527 match &self.kernel {
528 Kernel::RunningKernel(_) => {
529 self.send(message, cx).ok();
530 }
531 Kernel::StartingKernel(task) => {
532 // Queue up the execution as a task to run after the kernel starts
533 let task = task.clone();
534 let message = message.clone();
535
536 cx.spawn(|this, mut cx| async move {
537 task.await;
538 this.update(&mut cx, |session, cx| {
539 session.send(message, cx).ok();
540 })
541 .ok();
542 })
543 .detach();
544 }
545 _ => {}
546 }
547
548 if move_down {
549 editor.update(cx, move |editor, cx| {
550 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
551 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
552 });
553 });
554 }
555 }
556
557 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
558 let parent_message_id = match message.parent_header.as_ref() {
559 Some(header) => &header.msg_id,
560 None => return,
561 };
562
563 match &message.content {
564 JupyterMessageContent::Status(status) => {
565 self.kernel.set_execution_state(&status.execution_state);
566
567 self.telemetry.report_repl_event(
568 self.kernel_specification.language().into(),
569 KernelStatus::from(&self.kernel).to_string(),
570 cx.entity_id().to_string(),
571 );
572
573 cx.notify();
574 }
575 JupyterMessageContent::KernelInfoReply(reply) => {
576 self.kernel.set_kernel_info(reply);
577 cx.notify();
578 }
579 JupyterMessageContent::UpdateDisplayData(update) => {
580 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
581 display_id
582 } else {
583 return;
584 };
585
586 self.blocks.iter_mut().for_each(|(_, block)| {
587 block.execution_view.update(cx, |execution_view, cx| {
588 execution_view.update_display_data(&update.data, &display_id, cx);
589 });
590 });
591 return;
592 }
593 _ => {}
594 }
595
596 if let Some(block) = self.blocks.get_mut(parent_message_id) {
597 block.handle_message(message, cx);
598 }
599 }
600
601 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
602 match &mut self.kernel {
603 Kernel::RunningKernel(_kernel) => {
604 self.send(InterruptRequest {}.into(), cx).ok();
605 }
606 Kernel::StartingKernel(_task) => {
607 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
608 }
609 _ => {}
610 }
611 }
612
613 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
614 if let Kernel::Shutdown = kernel {
615 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
616 }
617
618 let kernel_status = KernelStatus::from(&kernel).to_string();
619 let kernel_language = self.kernel_specification.language().into();
620
621 self.telemetry.report_repl_event(
622 kernel_language,
623 kernel_status,
624 cx.entity_id().to_string(),
625 );
626
627 self.kernel = kernel;
628 }
629
630 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
631 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
632
633 match kernel {
634 Kernel::RunningKernel(mut kernel) => {
635 let mut request_tx = kernel.request_tx.clone();
636
637 cx.spawn(|this, mut cx| async move {
638 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
639 request_tx.try_send(message).ok();
640
641 // Give the kernel a bit of time to clean up
642 cx.background_executor().timer(Duration::from_secs(3)).await;
643
644 this.update(&mut cx, |session, _cx| {
645 session.messaging_task.take();
646 session.process_status_task.take();
647 })
648 .ok();
649
650 kernel.process.kill().ok();
651
652 this.update(&mut cx, |session, cx| {
653 session.clear_outputs(cx);
654 session.kernel(Kernel::Shutdown, cx);
655 cx.notify();
656 })
657 .ok();
658 })
659 .detach();
660 }
661 _ => {
662 self.messaging_task.take();
663 self.process_status_task.take();
664 self.kernel(Kernel::Shutdown, cx);
665 }
666 }
667 cx.notify();
668 }
669
670 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
671 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
672
673 match kernel {
674 Kernel::Restarting => {
675 // Do nothing if already restarting
676 }
677 Kernel::RunningKernel(mut kernel) => {
678 let mut request_tx = kernel.request_tx.clone();
679
680 cx.spawn(|this, mut cx| async move {
681 // Send shutdown request with restart flag
682 log::debug!("restarting kernel");
683 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
684 request_tx.try_send(message).ok();
685
686 this.update(&mut cx, |session, _cx| {
687 session.messaging_task.take();
688 session.process_status_task.take();
689 })
690 .ok();
691
692 // Wait for kernel to shutdown
693 cx.background_executor().timer(Duration::from_secs(1)).await;
694
695 // Force kill the kernel if it hasn't shut down
696 kernel.process.kill().ok();
697
698 // Start a new kernel
699 this.update(&mut cx, |session, cx| {
700 // todo!(): Differentiate between restart and restart+clear-outputs
701 session.clear_outputs(cx);
702 session.start_kernel(cx);
703 })
704 .ok();
705 })
706 .detach();
707 }
708 _ => {
709 // If it's not already running, we can just clean up and start a new kernel
710 self.messaging_task.take();
711 self.process_status_task.take();
712 self.clear_outputs(cx);
713 self.start_kernel(cx);
714 }
715 }
716 cx.notify();
717 }
718}
719
720pub enum SessionEvent {
721 Shutdown(WeakView<Editor>),
722}
723
724impl EventEmitter<SessionEvent> for Session {}
725
726impl Render for Session {
727 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
728 let (status_text, interrupt_button) = match &self.kernel {
729 Kernel::RunningKernel(kernel) => (
730 kernel
731 .kernel_info
732 .as_ref()
733 .map(|info| info.language_info.name.clone()),
734 Some(
735 Button::new("interrupt", "Interrupt")
736 .style(ButtonStyle::Subtle)
737 .on_click(cx.listener(move |session, _, cx| {
738 session.interrupt(cx);
739 })),
740 ),
741 ),
742 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
743 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
744 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
745 Kernel::Shutdown => (Some("Shutdown".into()), None),
746 Kernel::Restarting => (Some("Restarting".into()), None),
747 };
748
749 KernelListItem::new(self.kernel_specification.clone())
750 .status_color(match &self.kernel {
751 Kernel::RunningKernel(kernel) => match kernel.execution_state {
752 ExecutionState::Idle => Color::Success,
753 ExecutionState::Busy => Color::Modified,
754 },
755 Kernel::StartingKernel(_) => Color::Modified,
756 Kernel::ErroredLaunch(_) => Color::Error,
757 Kernel::ShuttingDown => Color::Modified,
758 Kernel::Shutdown => Color::Disabled,
759 Kernel::Restarting => Color::Modified,
760 })
761 .child(Label::new(self.kernel_specification.name()))
762 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
763 .button(
764 Button::new("shutdown", "Shutdown")
765 .style(ButtonStyle::Subtle)
766 .disabled(self.kernel.is_shutting_down())
767 .on_click(cx.listener(move |session, _, cx| {
768 session.shutdown(cx);
769 })),
770 )
771 .buttons(interrupt_button)
772 }
773}