1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
21 WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::ActiveTheme;
31use ui::{prelude::*, IconButtonShape, Tooltip};
32
33pub struct Session {
34 fs: Arc<dyn Fs>,
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Option<Task<()>>,
39 process_status_task: Option<Task<()>>,
40 pub kernel_specification: KernelSpecification,
41 telemetry: Arc<Telemetry>,
42 _buffer_subscription: Subscription,
43}
44
45struct EditorBlock {
46 code_range: Range<Anchor>,
47 invalidation_anchor: Anchor,
48 block_id: CustomBlockId,
49 execution_view: View<ExecutionView>,
50}
51
52type CloseBlockFn =
53 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
54
55impl EditorBlock {
56 fn new(
57 editor: WeakView<Editor>,
58 code_range: Range<Anchor>,
59 status: ExecutionStatus,
60 on_close: CloseBlockFn,
61 cx: &mut ViewContext<Session>,
62 ) -> anyhow::Result<Self> {
63 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
64
65 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
66 let buffer = editor.buffer().clone();
67 let buffer_snapshot = buffer.read(cx).snapshot(cx);
68 let end_point = code_range.end.to_point(&buffer_snapshot);
69 let next_row_start = end_point + Point::new(1, 0);
70 if next_row_start > buffer_snapshot.max_point() {
71 buffer.update(cx, |buffer, cx| {
72 buffer.edit(
73 [(
74 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
75 "\n",
76 )],
77 None,
78 cx,
79 )
80 });
81 }
82
83 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
84 let block = BlockProperties {
85 position: code_range.end,
86 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
87 height: 1,
88 style: BlockStyle::Sticky,
89 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
90 disposition: BlockDisposition::Below,
91 priority: 0,
92 };
93
94 let block_id = editor.insert_blocks([block], None, cx)[0];
95 (block_id, invalidation_anchor)
96 })?;
97
98 anyhow::Ok(Self {
99 code_range,
100 invalidation_anchor,
101 block_id,
102 execution_view,
103 })
104 }
105
106 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
107 self.execution_view.update(cx, |execution_view, cx| {
108 execution_view.push_message(&message.content, cx);
109 });
110 }
111
112 fn create_output_area_renderer(
113 execution_view: View<ExecutionView>,
114 on_close: CloseBlockFn,
115 ) -> RenderBlock {
116 let render = move |cx: &mut BlockContext| {
117 let execution_view = execution_view.clone();
118 let text_style = crate::outputs::plain::text_style(cx);
119
120 let gutter = cx.gutter_dimensions;
121
122 let block_id = cx.block_id;
123 let on_close = on_close.clone();
124
125 let rem_size = cx.rem_size();
126
127 let text_line_height = text_style.line_height_in_pixels(rem_size);
128
129 let close_button = h_flex()
130 .flex_none()
131 .items_center()
132 .justify_center()
133 .absolute()
134 .top(text_line_height / 2.)
135 .right(
136 // 2px is a magic number to nudge the button just a bit closer to
137 // the line number start
138 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
139 )
140 .w(text_line_height)
141 .h(text_line_height)
142 .child(
143 IconButton::new(
144 ("close_output_area", EntityId::from(cx.block_id)),
145 IconName::Close,
146 )
147 .icon_size(IconSize::Small)
148 .icon_color(Color::Muted)
149 .size(ButtonSize::Compact)
150 .shape(IconButtonShape::Square)
151 .tooltip(|cx| Tooltip::text("Close output area", cx))
152 .on_click(move |_, cx| {
153 if let BlockId::Custom(block_id) = block_id {
154 (on_close)(block_id, cx)
155 }
156 }),
157 );
158
159 div()
160 .flex()
161 .items_start()
162 .min_h(text_line_height)
163 .w_full()
164 .border_y_1()
165 .border_color(cx.theme().colors().border)
166 .bg(cx.theme().colors().background)
167 .child(
168 div()
169 .relative()
170 .w(gutter.full_width())
171 .h(text_line_height * 2)
172 .child(close_button),
173 )
174 .child(
175 div()
176 .flex_1()
177 .size_full()
178 .py(text_line_height / 2.)
179 .mr(gutter.width)
180 .child(execution_view),
181 )
182 .into_any_element()
183 };
184
185 Box::new(render)
186 }
187}
188
189impl Session {
190 pub fn new(
191 editor: WeakView<Editor>,
192 fs: Arc<dyn Fs>,
193 telemetry: Arc<Telemetry>,
194 kernel_specification: KernelSpecification,
195 cx: &mut ViewContext<Self>,
196 ) -> Self {
197 let subscription = match editor.upgrade() {
198 Some(editor) => {
199 let buffer = editor.read(cx).buffer().clone();
200 cx.subscribe(&buffer, Self::on_buffer_event)
201 }
202 None => Subscription::new(|| {}),
203 };
204
205 let mut session = Self {
206 fs,
207 editor,
208 kernel: Kernel::StartingKernel(Task::ready(()).shared()),
209 messaging_task: None,
210 process_status_task: None,
211 blocks: HashMap::default(),
212 kernel_specification,
213 _buffer_subscription: subscription,
214 telemetry,
215 };
216
217 session.start_kernel(cx);
218 session
219 }
220
221 fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
222 let kernel_language = self.kernel_specification.kernelspec.language.clone();
223 let entity_id = self.editor.entity_id();
224 let working_directory = self
225 .editor
226 .upgrade()
227 .and_then(|editor| editor.read(cx).working_directory(cx))
228 .unwrap_or_else(temp_dir);
229
230 self.telemetry.report_repl_event(
231 kernel_language.clone(),
232 KernelStatus::Starting.to_string(),
233 cx.entity_id().to_string(),
234 );
235
236 let kernel = RunningKernel::new(
237 self.kernel_specification.clone(),
238 entity_id,
239 working_directory,
240 self.fs.clone(),
241 cx,
242 );
243
244 let pending_kernel = cx
245 .spawn(|this, mut cx| async move {
246 let kernel = kernel.await;
247
248 match kernel {
249 Ok((mut kernel, mut messages_rx)) => {
250 this.update(&mut cx, |session, cx| {
251 let stderr = kernel.process.stderr.take();
252
253 cx.spawn(|_session, mut _cx| async move {
254 if let None = stderr {
255 return;
256 }
257 let reader = BufReader::new(stderr.unwrap());
258 let mut lines = reader.lines();
259 while let Some(Ok(line)) = lines.next().await {
260 // todo!(): Log stdout and stderr to something the session can show
261 log::error!("kernel: {}", line);
262 }
263 })
264 .detach();
265
266 let stdout = kernel.process.stdout.take();
267
268 cx.spawn(|_session, mut _cx| async move {
269 if let None = stdout {
270 return;
271 }
272 let reader = BufReader::new(stdout.unwrap());
273 let mut lines = reader.lines();
274 while let Some(Ok(line)) = lines.next().await {
275 log::info!("kernel: {}", line);
276 }
277 })
278 .detach();
279
280 let status = kernel.process.status();
281 session.kernel(Kernel::RunningKernel(kernel), cx);
282
283 let process_status_task = cx.spawn(|session, mut cx| async move {
284 let error_message = match status.await {
285 Ok(status) => {
286 if status.success() {
287 log::info!("kernel process exited successfully");
288 return;
289 }
290
291 format!("kernel process exited with status: {:?}", status)
292 }
293 Err(err) => {
294 format!("kernel process exited with error: {:?}", err)
295 }
296 };
297
298 log::error!("{}", error_message);
299
300 session
301 .update(&mut cx, |session, cx| {
302 session.kernel(
303 Kernel::ErroredLaunch(error_message.clone()),
304 cx,
305 );
306
307 session.blocks.values().for_each(|block| {
308 block.execution_view.update(
309 cx,
310 |execution_view, cx| {
311 match execution_view.status {
312 ExecutionStatus::Finished => {
313 // Do nothing when the output was good
314 }
315 _ => {
316 // All other cases, set the status to errored
317 execution_view.status =
318 ExecutionStatus::KernelErrored(
319 error_message.clone(),
320 )
321 }
322 }
323 cx.notify();
324 },
325 );
326 });
327
328 cx.notify();
329 })
330 .ok();
331 });
332
333 session.process_status_task = Some(process_status_task);
334
335 session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
336 while let Some(message) = messages_rx.next().await {
337 session
338 .update(&mut cx, |session, cx| {
339 session.route(&message, cx);
340 })
341 .ok();
342 }
343 }));
344
345 // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
346 // cx.spawn(|this, mut cx| async move {
347 // cx.background_executor()
348 // .timer(Duration::from_millis(120))
349 // .await;
350 // this.update(&mut cx, |this, cx| {
351 // this.send(KernelInfoRequest {}.into(), cx).ok();
352 // })
353 // .ok();
354 // })
355 // .detach();
356 })
357 .ok();
358 }
359 Err(err) => {
360 this.update(&mut cx, |session, cx| {
361 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
362 })
363 .ok();
364 }
365 }
366 })
367 .shared();
368
369 self.kernel(Kernel::StartingKernel(pending_kernel), cx);
370 cx.notify();
371 }
372
373 fn on_buffer_event(
374 &mut self,
375 buffer: Model<MultiBuffer>,
376 event: &multi_buffer::Event,
377 cx: &mut ViewContext<Self>,
378 ) {
379 if let multi_buffer::Event::Edited { .. } = event {
380 let snapshot = buffer.read(cx).snapshot(cx);
381
382 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
383
384 self.blocks.retain(|_id, block| {
385 if block.invalidation_anchor.is_valid(&snapshot) {
386 true
387 } else {
388 blocks_to_remove.insert(block.block_id);
389 false
390 }
391 });
392
393 if !blocks_to_remove.is_empty() {
394 self.editor
395 .update(cx, |editor, cx| {
396 editor.remove_blocks(blocks_to_remove, None, cx);
397 })
398 .ok();
399 cx.notify();
400 }
401 }
402 }
403
404 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
405 match &mut self.kernel {
406 Kernel::RunningKernel(kernel) => {
407 kernel.request_tx.try_send(message).ok();
408 }
409 _ => {}
410 }
411
412 anyhow::Ok(())
413 }
414
415 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
416 let blocks_to_remove: HashSet<CustomBlockId> =
417 self.blocks.values().map(|block| block.block_id).collect();
418
419 self.editor
420 .update(cx, |editor, cx| {
421 editor.remove_blocks(blocks_to_remove, None, cx);
422 })
423 .ok();
424
425 self.blocks.clear();
426 }
427
428 pub fn execute(
429 &mut self,
430 code: String,
431 anchor_range: Range<Anchor>,
432 next_cell: Option<Anchor>,
433 move_down: bool,
434 cx: &mut ViewContext<Self>,
435 ) {
436 let Some(editor) = self.editor.upgrade() else {
437 return;
438 };
439
440 if code.is_empty() {
441 return;
442 }
443
444 let execute_request = ExecuteRequest {
445 code,
446 ..ExecuteRequest::default()
447 };
448
449 let message: JupyterMessage = execute_request.into();
450
451 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
452
453 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
454
455 self.blocks.retain(|_key, block| {
456 if anchor_range.overlaps(&block.code_range, &buffer) {
457 blocks_to_remove.insert(block.block_id);
458 false
459 } else {
460 true
461 }
462 });
463
464 self.editor
465 .update(cx, |editor, cx| {
466 editor.remove_blocks(blocks_to_remove, None, cx);
467 })
468 .ok();
469
470 let status = match &self.kernel {
471 Kernel::Restarting => ExecutionStatus::Restarting,
472 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
473 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
474 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
475 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
476 Kernel::Shutdown => ExecutionStatus::Shutdown,
477 };
478
479 let parent_message_id = message.header.msg_id.clone();
480 let session_view = cx.view().downgrade();
481 let weak_editor = self.editor.clone();
482
483 let on_close: CloseBlockFn =
484 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
485 if let Some(session) = session_view.upgrade() {
486 session.update(cx, |session, cx| {
487 session.blocks.remove(&parent_message_id);
488 cx.notify();
489 });
490 }
491
492 if let Some(editor) = weak_editor.upgrade() {
493 editor.update(cx, |editor, cx| {
494 let mut block_ids = HashSet::default();
495 block_ids.insert(block_id);
496 editor.remove_blocks(block_ids, None, cx);
497 });
498 }
499 });
500
501 let Ok(editor_block) =
502 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
503 else {
504 return;
505 };
506
507 let new_cursor_pos = if let Some(next_cursor) = next_cell {
508 next_cursor
509 } else {
510 editor_block.invalidation_anchor
511 };
512
513 self.blocks
514 .insert(message.header.msg_id.clone(), editor_block);
515
516 match &self.kernel {
517 Kernel::RunningKernel(_) => {
518 self.send(message, cx).ok();
519 }
520 Kernel::StartingKernel(task) => {
521 // Queue up the execution as a task to run after the kernel starts
522 let task = task.clone();
523 let message = message.clone();
524
525 cx.spawn(|this, mut cx| async move {
526 task.await;
527 this.update(&mut cx, |session, cx| {
528 session.send(message, cx).ok();
529 })
530 .ok();
531 })
532 .detach();
533 }
534 _ => {}
535 }
536
537 if move_down {
538 editor.update(cx, move |editor, cx| {
539 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
540 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
541 });
542 });
543 }
544 }
545
546 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
547 let parent_message_id = match message.parent_header.as_ref() {
548 Some(header) => &header.msg_id,
549 None => return,
550 };
551
552 match &message.content {
553 JupyterMessageContent::Status(status) => {
554 self.kernel.set_execution_state(&status.execution_state);
555
556 self.telemetry.report_repl_event(
557 self.kernel_specification.kernelspec.language.clone(),
558 KernelStatus::from(&self.kernel).to_string(),
559 cx.entity_id().to_string(),
560 );
561
562 cx.notify();
563 }
564 JupyterMessageContent::KernelInfoReply(reply) => {
565 self.kernel.set_kernel_info(&reply);
566 cx.notify();
567 }
568 JupyterMessageContent::UpdateDisplayData(update) => {
569 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
570 display_id
571 } else {
572 return;
573 };
574
575 self.blocks.iter_mut().for_each(|(_, block)| {
576 block.execution_view.update(cx, |execution_view, cx| {
577 execution_view.update_display_data(&update.data, &display_id, cx);
578 });
579 });
580 return;
581 }
582 _ => {}
583 }
584
585 if let Some(block) = self.blocks.get_mut(parent_message_id) {
586 block.handle_message(&message, cx);
587 return;
588 }
589 }
590
591 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
592 match &mut self.kernel {
593 Kernel::RunningKernel(_kernel) => {
594 self.send(InterruptRequest {}.into(), cx).ok();
595 }
596 Kernel::StartingKernel(_task) => {
597 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
598 }
599 _ => {}
600 }
601 }
602
603 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
604 if let Kernel::Shutdown = kernel {
605 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
606 }
607
608 let kernel_status = KernelStatus::from(&kernel).to_string();
609 let kernel_language = self.kernel_specification.kernelspec.language.clone();
610
611 self.telemetry.report_repl_event(
612 kernel_language,
613 kernel_status,
614 cx.entity_id().to_string(),
615 );
616
617 self.kernel = kernel;
618 }
619
620 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
621 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
622
623 match kernel {
624 Kernel::RunningKernel(mut kernel) => {
625 let mut request_tx = kernel.request_tx.clone();
626
627 cx.spawn(|this, mut cx| async move {
628 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
629 request_tx.try_send(message).ok();
630
631 // Give the kernel a bit of time to clean up
632 cx.background_executor().timer(Duration::from_secs(3)).await;
633
634 this.update(&mut cx, |session, _cx| {
635 session.messaging_task.take();
636 session.process_status_task.take();
637 })
638 .ok();
639
640 kernel.process.kill().ok();
641
642 this.update(&mut cx, |session, cx| {
643 session.clear_outputs(cx);
644 session.kernel(Kernel::Shutdown, cx);
645 cx.notify();
646 })
647 .ok();
648 })
649 .detach();
650 }
651 _ => {
652 self.messaging_task.take();
653 self.process_status_task.take();
654 self.kernel(Kernel::Shutdown, cx);
655 }
656 }
657 cx.notify();
658 }
659
660 pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
661 let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
662
663 match kernel {
664 Kernel::Restarting => {
665 // Do nothing if already restarting
666 }
667 Kernel::RunningKernel(mut kernel) => {
668 let mut request_tx = kernel.request_tx.clone();
669
670 cx.spawn(|this, mut cx| async move {
671 // Send shutdown request with restart flag
672 log::debug!("restarting kernel");
673 let message: JupyterMessage = ShutdownRequest { restart: true }.into();
674 request_tx.try_send(message).ok();
675
676 this.update(&mut cx, |session, _cx| {
677 session.messaging_task.take();
678 session.process_status_task.take();
679 })
680 .ok();
681
682 // Wait for kernel to shutdown
683 cx.background_executor().timer(Duration::from_secs(1)).await;
684
685 // Force kill the kernel if it hasn't shut down
686 kernel.process.kill().ok();
687
688 // Start a new kernel
689 this.update(&mut cx, |session, cx| {
690 // todo!(): Differentiate between restart and restart+clear-outputs
691 session.clear_outputs(cx);
692 session.start_kernel(cx);
693 })
694 .ok();
695 })
696 .detach();
697 }
698 _ => {
699 // If it's not already running, we can just clean up and start a new kernel
700 self.messaging_task.take();
701 self.process_status_task.take();
702 self.clear_outputs(cx);
703 self.start_kernel(cx);
704 }
705 }
706 cx.notify();
707 }
708}
709
710pub enum SessionEvent {
711 Shutdown(WeakView<Editor>),
712}
713
714impl EventEmitter<SessionEvent> for Session {}
715
716impl Render for Session {
717 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
718 let (status_text, interrupt_button) = match &self.kernel {
719 Kernel::RunningKernel(kernel) => (
720 kernel
721 .kernel_info
722 .as_ref()
723 .map(|info| info.language_info.name.clone()),
724 Some(
725 Button::new("interrupt", "Interrupt")
726 .style(ButtonStyle::Subtle)
727 .on_click(cx.listener(move |session, _, cx| {
728 session.interrupt(cx);
729 })),
730 ),
731 ),
732 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
733 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
734 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
735 Kernel::Shutdown => (Some("Shutdown".into()), None),
736 Kernel::Restarting => (Some("Restarting".into()), None),
737 };
738
739 KernelListItem::new(self.kernel_specification.clone())
740 .status_color(match &self.kernel {
741 Kernel::RunningKernel(kernel) => match kernel.execution_state {
742 ExecutionState::Idle => Color::Success,
743 ExecutionState::Busy => Color::Modified,
744 },
745 Kernel::StartingKernel(_) => Color::Modified,
746 Kernel::ErroredLaunch(_) => Color::Error,
747 Kernel::ShuttingDown => Color::Modified,
748 Kernel::Shutdown => Color::Disabled,
749 Kernel::Restarting => Color::Modified,
750 })
751 .child(Label::new(self.kernel_specification.name.clone()))
752 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
753 .button(
754 Button::new("shutdown", "Shutdown")
755 .style(ButtonStyle::Subtle)
756 .disabled(self.kernel.is_shutting_down())
757 .on_click(cx.listener(move |session, _, cx| {
758 session.shutdown(cx);
759 })),
760 )
761 .buttons(interrupt_button)
762 }
763}