1use crate::components::KernelListItem;
2use crate::{
3 kernels::{Kernel, KernelSpecification, RunningKernel},
4 outputs::{ExecutionStatus, ExecutionView},
5};
6use crate::{stdio, KernelStatus};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
21 WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::ActiveTheme;
31use ui::{prelude::*, IconButtonShape, Tooltip};
32
33pub struct Session {
34 editor: WeakView<Editor>,
35 pub kernel: Kernel,
36 blocks: HashMap<String, EditorBlock>,
37 messaging_task: Task<()>,
38 pub kernel_specification: KernelSpecification,
39 telemetry: Arc<Telemetry>,
40 _buffer_subscription: Subscription,
41}
42
43struct EditorBlock {
44 code_range: Range<Anchor>,
45 invalidation_anchor: Anchor,
46 block_id: CustomBlockId,
47 execution_view: View<ExecutionView>,
48}
49
50type CloseBlockFn =
51 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
52
53impl EditorBlock {
54 fn new(
55 editor: WeakView<Editor>,
56 code_range: Range<Anchor>,
57 status: ExecutionStatus,
58 on_close: CloseBlockFn,
59 cx: &mut ViewContext<Session>,
60 ) -> anyhow::Result<Self> {
61 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
62
63 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
64 let buffer = editor.buffer().clone();
65 let buffer_snapshot = buffer.read(cx).snapshot(cx);
66 let end_point = code_range.end.to_point(&buffer_snapshot);
67 let next_row_start = end_point + Point::new(1, 0);
68 if next_row_start > buffer_snapshot.max_point() {
69 buffer.update(cx, |buffer, cx| {
70 buffer.edit(
71 [(
72 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
73 "\n",
74 )],
75 None,
76 cx,
77 )
78 });
79 }
80
81 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
82 let block = BlockProperties {
83 position: code_range.end,
84 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
85 height: 1,
86 style: BlockStyle::Sticky,
87 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
88 disposition: BlockDisposition::Below,
89 priority: 0,
90 };
91
92 let block_id = editor.insert_blocks([block], None, cx)[0];
93 (block_id, invalidation_anchor)
94 })?;
95
96 anyhow::Ok(Self {
97 code_range,
98 invalidation_anchor,
99 block_id,
100 execution_view,
101 })
102 }
103
104 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
105 self.execution_view.update(cx, |execution_view, cx| {
106 execution_view.push_message(&message.content, cx);
107 });
108 }
109
110 fn create_output_area_renderer(
111 execution_view: View<ExecutionView>,
112 on_close: CloseBlockFn,
113 ) -> RenderBlock {
114 let render = move |cx: &mut BlockContext| {
115 let execution_view = execution_view.clone();
116 let text_style = stdio::text_style(cx);
117
118 let gutter = cx.gutter_dimensions;
119
120 let block_id = cx.block_id;
121 let on_close = on_close.clone();
122
123 let rem_size = cx.rem_size();
124
125 let text_line_height = text_style.line_height_in_pixels(rem_size);
126
127 let close_button = h_flex()
128 .flex_none()
129 .items_center()
130 .justify_center()
131 .absolute()
132 .top(text_line_height / 2.)
133 .right(
134 // 2px is a magic number to nudge the button just a bit closer to
135 // the line number start
136 gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
137 )
138 .w(text_line_height)
139 .h(text_line_height)
140 .child(
141 IconButton::new(
142 ("close_output_area", EntityId::from(cx.block_id)),
143 IconName::Close,
144 )
145 .icon_size(IconSize::Small)
146 .icon_color(Color::Muted)
147 .size(ButtonSize::Compact)
148 .shape(IconButtonShape::Square)
149 .tooltip(|cx| Tooltip::text("Close output area", cx))
150 .on_click(move |_, cx| {
151 if let BlockId::Custom(block_id) = block_id {
152 (on_close)(block_id, cx)
153 }
154 }),
155 );
156
157 div()
158 .flex()
159 .items_start()
160 .min_h(text_line_height)
161 .w_full()
162 .border_y_1()
163 .border_color(cx.theme().colors().border)
164 .bg(cx.theme().colors().background)
165 .child(
166 div()
167 .relative()
168 .w(gutter.full_width())
169 .h(text_line_height * 2)
170 .child(close_button),
171 )
172 .child(
173 div()
174 .flex_1()
175 .size_full()
176 .py(text_line_height / 2.)
177 .mr(gutter.width)
178 .child(execution_view),
179 )
180 .into_any_element()
181 };
182
183 Box::new(render)
184 }
185}
186
187impl Session {
188 pub fn new(
189 editor: WeakView<Editor>,
190 fs: Arc<dyn Fs>,
191 telemetry: Arc<Telemetry>,
192 kernel_specification: KernelSpecification,
193 cx: &mut ViewContext<Self>,
194 ) -> Self {
195 let kernel_language = kernel_specification.kernelspec.language.clone();
196
197 telemetry.report_repl_event(
198 kernel_language.clone(),
199 KernelStatus::Starting.to_string(),
200 cx.entity_id().to_string(),
201 );
202
203 let entity_id = editor.entity_id();
204 let working_directory = editor
205 .upgrade()
206 .and_then(|editor| editor.read(cx).working_directory(cx))
207 .unwrap_or_else(temp_dir);
208 let kernel = RunningKernel::new(
209 kernel_specification.clone(),
210 entity_id,
211 working_directory,
212 fs.clone(),
213 cx,
214 );
215
216 let pending_kernel = cx
217 .spawn(|this, mut cx| async move {
218 let kernel = kernel.await;
219
220 match kernel {
221 Ok((mut kernel, mut messages_rx)) => {
222 this.update(&mut cx, |session, cx| {
223 let stderr = kernel.process.stderr.take();
224
225 cx.spawn(|_session, mut _cx| async move {
226 if let None = stderr {
227 return;
228 }
229 let reader = BufReader::new(stderr.unwrap());
230 let mut lines = reader.lines();
231 while let Some(Ok(line)) = lines.next().await {
232 log::error!("kernel: {}", line);
233 }
234 })
235 .detach();
236
237 let stdout = kernel.process.stdout.take();
238
239 cx.spawn(|_session, mut _cx| async move {
240 if let None = stdout {
241 return;
242 }
243 let reader = BufReader::new(stdout.unwrap());
244 let mut lines = reader.lines();
245 while let Some(Ok(line)) = lines.next().await {
246 log::info!("kernel: {}", line);
247 }
248 })
249 .detach();
250
251 let status = kernel.process.status();
252 session.kernel(Kernel::RunningKernel(kernel), cx);
253
254 cx.spawn(|session, mut cx| async move {
255 let error_message = match status.await {
256 Ok(status) => {
257 if status.success() {
258 log::info!("kernel process exited successfully");
259 return;
260 }
261
262 format!("kernel process exited with status: {:?}", status)
263 }
264 Err(err) => {
265 format!("kernel process exited with error: {:?}", err)
266 }
267 };
268
269 log::error!("{}", error_message);
270
271 session
272 .update(&mut cx, |session, cx| {
273 session.kernel(
274 Kernel::ErroredLaunch(error_message.clone()),
275 cx,
276 );
277
278 session.blocks.values().for_each(|block| {
279 block.execution_view.update(
280 cx,
281 |execution_view, cx| {
282 match execution_view.status {
283 ExecutionStatus::Finished => {
284 // Do nothing when the output was good
285 }
286 _ => {
287 // All other cases, set the status to errored
288 execution_view.status =
289 ExecutionStatus::KernelErrored(
290 error_message.clone(),
291 )
292 }
293 }
294 cx.notify();
295 },
296 );
297 });
298
299 cx.notify();
300 })
301 .ok();
302 })
303 .detach();
304
305 session.messaging_task = cx.spawn(|session, mut cx| async move {
306 while let Some(message) = messages_rx.next().await {
307 session
308 .update(&mut cx, |session, cx| {
309 session.route(&message, cx);
310 })
311 .ok();
312 }
313 });
314
315 // todo!(@rgbkrk): send kernelinforequest once our shell channel read/writes are split
316 // cx.spawn(|this, mut cx| async move {
317 // cx.background_executor()
318 // .timer(Duration::from_millis(120))
319 // .await;
320 // this.update(&mut cx, |this, cx| {
321 // this.send(KernelInfoRequest {}.into(), cx).ok();
322 // })
323 // .ok();
324 // })
325 // .detach();
326 })
327 .ok();
328 }
329 Err(err) => {
330 this.update(&mut cx, |session, cx| {
331 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
332 })
333 .ok();
334 }
335 }
336 })
337 .shared();
338
339 let subscription = match editor.upgrade() {
340 Some(editor) => {
341 let buffer = editor.read(cx).buffer().clone();
342 cx.subscribe(&buffer, Self::on_buffer_event)
343 }
344 None => Subscription::new(|| {}),
345 };
346
347 return Self {
348 editor,
349 kernel: Kernel::StartingKernel(pending_kernel),
350 messaging_task: Task::ready(()),
351 blocks: HashMap::default(),
352 kernel_specification,
353 _buffer_subscription: subscription,
354 telemetry,
355 };
356 }
357
358 fn on_buffer_event(
359 &mut self,
360 buffer: Model<MultiBuffer>,
361 event: &multi_buffer::Event,
362 cx: &mut ViewContext<Self>,
363 ) {
364 if let multi_buffer::Event::Edited { .. } = event {
365 let snapshot = buffer.read(cx).snapshot(cx);
366
367 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
368
369 self.blocks.retain(|_id, block| {
370 if block.invalidation_anchor.is_valid(&snapshot) {
371 true
372 } else {
373 blocks_to_remove.insert(block.block_id);
374 false
375 }
376 });
377
378 if !blocks_to_remove.is_empty() {
379 self.editor
380 .update(cx, |editor, cx| {
381 editor.remove_blocks(blocks_to_remove, None, cx);
382 })
383 .ok();
384 cx.notify();
385 }
386 }
387 }
388
389 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
390 match &mut self.kernel {
391 Kernel::RunningKernel(kernel) => {
392 kernel.request_tx.try_send(message).ok();
393 }
394 _ => {}
395 }
396
397 anyhow::Ok(())
398 }
399
400 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
401 let blocks_to_remove: HashSet<CustomBlockId> =
402 self.blocks.values().map(|block| block.block_id).collect();
403
404 self.editor
405 .update(cx, |editor, cx| {
406 editor.remove_blocks(blocks_to_remove, None, cx);
407 })
408 .ok();
409
410 self.blocks.clear();
411 }
412
413 pub fn execute(
414 &mut self,
415 code: String,
416 anchor_range: Range<Anchor>,
417 next_cell: Option<Anchor>,
418 move_down: bool,
419 cx: &mut ViewContext<Self>,
420 ) {
421 let Some(editor) = self.editor.upgrade() else {
422 return;
423 };
424
425 if code.is_empty() {
426 return;
427 }
428
429 let execute_request = ExecuteRequest {
430 code,
431 ..ExecuteRequest::default()
432 };
433
434 let message: JupyterMessage = execute_request.into();
435
436 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
437
438 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
439
440 self.blocks.retain(|_key, block| {
441 if anchor_range.overlaps(&block.code_range, &buffer) {
442 blocks_to_remove.insert(block.block_id);
443 false
444 } else {
445 true
446 }
447 });
448
449 self.editor
450 .update(cx, |editor, cx| {
451 editor.remove_blocks(blocks_to_remove, None, cx);
452 })
453 .ok();
454
455 let status = match &self.kernel {
456 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
457 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
458 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
459 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
460 Kernel::Shutdown => ExecutionStatus::Shutdown,
461 };
462
463 let parent_message_id = message.header.msg_id.clone();
464 let session_view = cx.view().downgrade();
465 let weak_editor = self.editor.clone();
466
467 let on_close: CloseBlockFn =
468 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
469 if let Some(session) = session_view.upgrade() {
470 session.update(cx, |session, cx| {
471 session.blocks.remove(&parent_message_id);
472 cx.notify();
473 });
474 }
475
476 if let Some(editor) = weak_editor.upgrade() {
477 editor.update(cx, |editor, cx| {
478 let mut block_ids = HashSet::default();
479 block_ids.insert(block_id);
480 editor.remove_blocks(block_ids, None, cx);
481 });
482 }
483 });
484
485 let Ok(editor_block) =
486 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
487 else {
488 return;
489 };
490
491 let new_cursor_pos = if let Some(next_cursor) = next_cell {
492 next_cursor
493 } else {
494 editor_block.invalidation_anchor
495 };
496
497 self.blocks
498 .insert(message.header.msg_id.clone(), editor_block);
499
500 match &self.kernel {
501 Kernel::RunningKernel(_) => {
502 self.send(message, cx).ok();
503 }
504 Kernel::StartingKernel(task) => {
505 // Queue up the execution as a task to run after the kernel starts
506 let task = task.clone();
507 let message = message.clone();
508
509 cx.spawn(|this, mut cx| async move {
510 task.await;
511 this.update(&mut cx, |session, cx| {
512 session.send(message, cx).ok();
513 })
514 .ok();
515 })
516 .detach();
517 }
518 _ => {}
519 }
520
521 if move_down {
522 editor.update(cx, move |editor, cx| {
523 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
524 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
525 });
526 });
527 }
528 }
529
530 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
531 let parent_message_id = match message.parent_header.as_ref() {
532 Some(header) => &header.msg_id,
533 None => return,
534 };
535
536 match &message.content {
537 JupyterMessageContent::Status(status) => {
538 self.kernel.set_execution_state(&status.execution_state);
539
540 self.telemetry.report_repl_event(
541 self.kernel_specification.kernelspec.language.clone(),
542 KernelStatus::from(&self.kernel).to_string(),
543 cx.entity_id().to_string(),
544 );
545
546 cx.notify();
547 }
548 JupyterMessageContent::KernelInfoReply(reply) => {
549 self.kernel.set_kernel_info(&reply);
550 cx.notify();
551 }
552 JupyterMessageContent::UpdateDisplayData(update) => {
553 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
554 display_id
555 } else {
556 return;
557 };
558
559 self.blocks.iter_mut().for_each(|(_, block)| {
560 block.execution_view.update(cx, |execution_view, cx| {
561 execution_view.update_display_data(&update.data, &display_id, cx);
562 });
563 });
564 return;
565 }
566 _ => {}
567 }
568
569 if let Some(block) = self.blocks.get_mut(parent_message_id) {
570 block.handle_message(&message, cx);
571 return;
572 }
573 }
574
575 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
576 match &mut self.kernel {
577 Kernel::RunningKernel(_kernel) => {
578 self.send(InterruptRequest {}.into(), cx).ok();
579 }
580 Kernel::StartingKernel(_task) => {
581 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
582 }
583 _ => {}
584 }
585 }
586
587 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
588 if let Kernel::Shutdown = kernel {
589 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
590 }
591
592 let kernel_status = KernelStatus::from(&kernel).to_string();
593 let kernel_language = self.kernel_specification.kernelspec.language.clone();
594
595 self.telemetry.report_repl_event(
596 kernel_language,
597 kernel_status,
598 cx.entity_id().to_string(),
599 );
600
601 self.kernel = kernel;
602 }
603
604 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
605 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
606
607 match kernel {
608 Kernel::RunningKernel(mut kernel) => {
609 let mut request_tx = kernel.request_tx.clone();
610
611 cx.spawn(|this, mut cx| async move {
612 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
613 request_tx.try_send(message).ok();
614
615 // Give the kernel a bit of time to clean up
616 cx.background_executor().timer(Duration::from_secs(3)).await;
617
618 kernel.process.kill().ok();
619
620 this.update(&mut cx, |session, cx| {
621 session.clear_outputs(cx);
622 session.kernel(Kernel::Shutdown, cx);
623 cx.notify();
624 })
625 .ok();
626 })
627 .detach();
628 }
629 Kernel::StartingKernel(_kernel) => {
630 self.kernel = Kernel::Shutdown;
631 }
632 _ => {
633 self.kernel = Kernel::Shutdown;
634 }
635 }
636 cx.notify();
637 }
638}
639
640pub enum SessionEvent {
641 Shutdown(WeakView<Editor>),
642}
643
644impl EventEmitter<SessionEvent> for Session {}
645
646impl Render for Session {
647 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
648 let (status_text, interrupt_button) = match &self.kernel {
649 Kernel::RunningKernel(kernel) => (
650 kernel
651 .kernel_info
652 .as_ref()
653 .map(|info| info.language_info.name.clone()),
654 Some(
655 Button::new("interrupt", "Interrupt")
656 .style(ButtonStyle::Subtle)
657 .on_click(cx.listener(move |session, _, cx| {
658 session.interrupt(cx);
659 })),
660 ),
661 ),
662 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
663 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
664 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
665 Kernel::Shutdown => (Some("Shutdown".into()), None),
666 };
667
668 KernelListItem::new(self.kernel_specification.clone())
669 .status_color(match &self.kernel {
670 Kernel::RunningKernel(kernel) => match kernel.execution_state {
671 ExecutionState::Idle => Color::Success,
672 ExecutionState::Busy => Color::Modified,
673 },
674 Kernel::StartingKernel(_) => Color::Modified,
675 Kernel::ErroredLaunch(_) => Color::Error,
676 Kernel::ShuttingDown => Color::Modified,
677 Kernel::Shutdown => Color::Disabled,
678 })
679 .child(Label::new(self.kernel_specification.name.clone()))
680 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
681 .button(
682 Button::new("shutdown", "Shutdown")
683 .style(ButtonStyle::Subtle)
684 .disabled(self.kernel.is_shutting_down())
685 .on_click(cx.listener(move |session, _, cx| {
686 session.shutdown(cx);
687 })),
688 )
689 .buttons(interrupt_button)
690 }
691}