1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
21 WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use settings::Settings as _;
30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
31use theme::{ActiveTheme, ThemeSettings};
32use ui::{prelude::*, IconButtonShape, Tooltip};
33
34pub struct Session {
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Task<()>,
39 pub kernel_specification: KernelSpecification,
40 telemetry: Arc<Telemetry>,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 editor: WeakView<Editor>,
46 code_range: Range<Anchor>,
47 invalidation_anchor: Anchor,
48 block_id: CustomBlockId,
49 execution_view: View<ExecutionView>,
50 on_close: CloseBlockFn,
51}
52
53type CloseBlockFn =
54 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
55
56impl EditorBlock {
57 fn new(
58 editor: WeakView<Editor>,
59 code_range: Range<Anchor>,
60 status: ExecutionStatus,
61 on_close: CloseBlockFn,
62 cx: &mut ViewContext<Session>,
63 ) -> anyhow::Result<Self> {
64 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
65
66 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
67 let buffer = editor.buffer().clone();
68 let buffer_snapshot = buffer.read(cx).snapshot(cx);
69 let end_point = code_range.end.to_point(&buffer_snapshot);
70 let next_row_start = end_point + Point::new(1, 0);
71 if next_row_start > buffer_snapshot.max_point() {
72 buffer.update(cx, |buffer, cx| {
73 buffer.edit(
74 [(
75 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
76 "\n",
77 )],
78 None,
79 cx,
80 )
81 });
82 }
83
84 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
85 let block = BlockProperties {
86 position: code_range.end,
87 height: execution_view.num_lines(cx).saturating_add(1),
88 style: BlockStyle::Sticky,
89 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
90 disposition: BlockDisposition::Below,
91 };
92
93 let block_id = editor.insert_blocks([block], None, cx)[0];
94 (block_id, invalidation_anchor)
95 })?;
96
97 anyhow::Ok(Self {
98 editor,
99 code_range,
100 invalidation_anchor,
101 block_id,
102 execution_view,
103 on_close,
104 })
105 }
106
107 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
108 self.execution_view.update(cx, |execution_view, cx| {
109 execution_view.push_message(&message.content, cx);
110 });
111
112 self.editor
113 .update(cx, |editor, cx| {
114 let mut replacements = HashMap::default();
115
116 replacements.insert(
117 self.block_id,
118 (
119 Some(self.execution_view.num_lines(cx).saturating_add(1)),
120 Self::create_output_area_renderer(
121 self.execution_view.clone(),
122 self.on_close.clone(),
123 ),
124 ),
125 );
126 editor.replace_blocks(replacements, None, cx);
127 })
128 .ok();
129 }
130
131 fn create_output_area_renderer(
132 execution_view: View<ExecutionView>,
133 on_close: CloseBlockFn,
134 ) -> RenderBlock {
135 let render = move |cx: &mut BlockContext| {
136 let execution_view = execution_view.clone();
137 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
138 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
139
140 let gutter = cx.gutter_dimensions;
141 let close_button_size = IconSize::XSmall;
142
143 let block_id = cx.block_id;
144 let on_close = on_close.clone();
145
146 let rem_size = cx.rem_size();
147 let line_height = cx.text_style().line_height_in_pixels(rem_size);
148
149 let (close_button_width, close_button_padding) =
150 close_button_size.square_components(cx);
151
152 div()
153 .min_h(line_height)
154 .flex()
155 .flex_row()
156 .items_start()
157 .w_full()
158 .bg(cx.theme().colors().background)
159 .border_y_1()
160 .border_color(cx.theme().colors().border)
161 .child(
162 v_flex().min_h(cx.line_height()).justify_center().child(
163 h_flex()
164 .w(gutter.full_width())
165 .justify_end()
166 .pt(line_height / 2.)
167 .child(
168 h_flex()
169 .pr(gutter.width / 2. - close_button_width
170 + close_button_padding / 2.)
171 .child(
172 IconButton::new(
173 ("close_output_area", EntityId::from(cx.block_id)),
174 IconName::Close,
175 )
176 .shape(IconButtonShape::Square)
177 .icon_size(close_button_size)
178 .icon_color(Color::Muted)
179 .tooltip(|cx| Tooltip::text("Close output area", cx))
180 .on_click(
181 move |_, cx| {
182 if let BlockId::Custom(block_id) = block_id {
183 (on_close)(block_id, cx)
184 }
185 },
186 ),
187 ),
188 ),
189 ),
190 )
191 .child(
192 div()
193 .flex_1()
194 .size_full()
195 .my_2()
196 .mr(gutter.width)
197 .text_size(text_font_size)
198 .font_family(text_font)
199 .child(execution_view),
200 )
201 .into_any_element()
202 };
203
204 Box::new(render)
205 }
206}
207
208impl Session {
209 pub fn new(
210 editor: WeakView<Editor>,
211 fs: Arc<dyn Fs>,
212 telemetry: Arc<Telemetry>,
213 kernel_specification: KernelSpecification,
214 cx: &mut ViewContext<Self>,
215 ) -> Self {
216 let kernel_language = kernel_specification.kernelspec.language.clone();
217
218 telemetry.report_repl_event(
219 kernel_language.clone(),
220 KernelStatus::Starting.to_string(),
221 cx.entity_id().to_string(),
222 );
223
224 let entity_id = editor.entity_id();
225 let working_directory = editor
226 .upgrade()
227 .and_then(|editor| editor.read(cx).working_directory(cx))
228 .unwrap_or_else(temp_dir);
229 let kernel = RunningKernel::new(
230 kernel_specification.clone(),
231 entity_id,
232 working_directory,
233 fs.clone(),
234 cx,
235 );
236
237 let pending_kernel = cx
238 .spawn(|this, mut cx| async move {
239 let kernel = kernel.await;
240
241 match kernel {
242 Ok((mut kernel, mut messages_rx)) => {
243 this.update(&mut cx, |session, cx| {
244 // At this point we can create a new kind of kernel that has the process and our long running background tasks
245
246 let stderr = kernel.process.stderr.take();
247
248 cx.spawn(|_session, mut _cx| async move {
249 if let None = stderr {
250 return;
251 }
252 let reader = BufReader::new(stderr.unwrap());
253 let mut lines = reader.lines();
254 while let Some(Ok(line)) = lines.next().await {
255 log::error!("kernel: {}", line);
256 }
257 })
258 .detach();
259
260 let stdout = kernel.process.stderr.take();
261
262 cx.spawn(|_session, mut _cx| async move {
263 if let None = stdout {
264 return;
265 }
266 let reader = BufReader::new(stdout.unwrap());
267 let mut lines = reader.lines();
268 while let Some(Ok(line)) = lines.next().await {
269 log::info!("kernel: {}", line);
270 }
271 })
272 .detach();
273
274 let status = kernel.process.status();
275 session.kernel(Kernel::RunningKernel(kernel), cx);
276
277 cx.spawn(|session, mut cx| async move {
278 let error_message = match status.await {
279 Ok(status) => {
280 if status.success() {
281 log::info!("kernel process exited successfully");
282 return;
283 }
284
285 format!("kernel process exited with status: {:?}", status)
286 }
287 Err(err) => {
288 format!("kernel process exited with error: {:?}", err)
289 }
290 };
291
292 log::error!("{}", error_message);
293
294 session
295 .update(&mut cx, |session, cx| {
296 session.kernel(
297 Kernel::ErroredLaunch(error_message.clone()),
298 cx,
299 );
300
301 session.blocks.values().for_each(|block| {
302 block.execution_view.update(
303 cx,
304 |execution_view, cx| {
305 match execution_view.status {
306 ExecutionStatus::Finished => {
307 // Do nothing when the output was good
308 }
309 _ => {
310 // All other cases, set the status to errored
311 execution_view.status =
312 ExecutionStatus::KernelErrored(
313 error_message.clone(),
314 )
315 }
316 }
317 cx.notify();
318 },
319 );
320 });
321
322 cx.notify();
323 })
324 .ok();
325 })
326 .detach();
327
328 session.messaging_task = cx.spawn(|session, mut cx| async move {
329 while let Some(message) = messages_rx.next().await {
330 session
331 .update(&mut cx, |session, cx| {
332 session.route(&message, cx);
333 })
334 .ok();
335 }
336 });
337
338 // todo!(kyle): send kernelinforequest once our shell channel read/writes are split
339 // cx.spawn(|this, mut cx| async move {
340 // cx.background_executor()
341 // .timer(Duration::from_millis(120))
342 // .await;
343 // this.update(&mut cx, |this, cx| {
344 // this.send(KernelInfoRequest {}.into(), cx).ok();
345 // })
346 // .ok();
347 // })
348 // .detach();
349 })
350 .ok();
351 }
352 Err(err) => {
353 this.update(&mut cx, |session, cx| {
354 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
355 })
356 .ok();
357 }
358 }
359 })
360 .shared();
361
362 let subscription = match editor.upgrade() {
363 Some(editor) => {
364 let buffer = editor.read(cx).buffer().clone();
365 cx.subscribe(&buffer, Self::on_buffer_event)
366 }
367 None => Subscription::new(|| {}),
368 };
369
370 return Self {
371 editor,
372 kernel: Kernel::StartingKernel(pending_kernel),
373 messaging_task: Task::ready(()),
374 blocks: HashMap::default(),
375 kernel_specification,
376 _buffer_subscription: subscription,
377 telemetry,
378 };
379 }
380
381 fn on_buffer_event(
382 &mut self,
383 buffer: Model<MultiBuffer>,
384 event: &multi_buffer::Event,
385 cx: &mut ViewContext<Self>,
386 ) {
387 if let multi_buffer::Event::Edited { .. } = event {
388 let snapshot = buffer.read(cx).snapshot(cx);
389
390 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
391
392 self.blocks.retain(|_id, block| {
393 if block.invalidation_anchor.is_valid(&snapshot) {
394 true
395 } else {
396 blocks_to_remove.insert(block.block_id);
397 false
398 }
399 });
400
401 if !blocks_to_remove.is_empty() {
402 self.editor
403 .update(cx, |editor, cx| {
404 editor.remove_blocks(blocks_to_remove, None, cx);
405 })
406 .ok();
407 cx.notify();
408 }
409 }
410 }
411
412 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
413 match &mut self.kernel {
414 Kernel::RunningKernel(kernel) => {
415 kernel.request_tx.try_send(message).ok();
416 }
417 _ => {}
418 }
419
420 anyhow::Ok(())
421 }
422
423 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
424 let blocks_to_remove: HashSet<CustomBlockId> =
425 self.blocks.values().map(|block| block.block_id).collect();
426
427 self.editor
428 .update(cx, |editor, cx| {
429 editor.remove_blocks(blocks_to_remove, None, cx);
430 })
431 .ok();
432
433 self.blocks.clear();
434 }
435
436 pub fn execute(
437 &mut self,
438 code: String,
439 anchor_range: Range<Anchor>,
440 next_cell: Option<Anchor>,
441 cx: &mut ViewContext<Self>,
442 ) {
443 let Some(editor) = self.editor.upgrade() else {
444 return;
445 };
446
447 if code.is_empty() {
448 return;
449 }
450
451 let execute_request = ExecuteRequest {
452 code,
453 ..ExecuteRequest::default()
454 };
455
456 let message: JupyterMessage = execute_request.into();
457
458 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
459
460 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
461
462 self.blocks.retain(|_key, block| {
463 if anchor_range.overlaps(&block.code_range, &buffer) {
464 blocks_to_remove.insert(block.block_id);
465 false
466 } else {
467 true
468 }
469 });
470
471 self.editor
472 .update(cx, |editor, cx| {
473 editor.remove_blocks(blocks_to_remove, None, cx);
474 })
475 .ok();
476
477 let status = match &self.kernel {
478 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
479 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
480 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
481 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
482 Kernel::Shutdown => ExecutionStatus::Shutdown,
483 };
484
485 let parent_message_id = message.header.msg_id.clone();
486 let session_view = cx.view().downgrade();
487 let weak_editor = self.editor.clone();
488
489 let on_close: CloseBlockFn =
490 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
491 if let Some(session) = session_view.upgrade() {
492 session.update(cx, |session, cx| {
493 session.blocks.remove(&parent_message_id);
494 cx.notify();
495 });
496 }
497
498 if let Some(editor) = weak_editor.upgrade() {
499 editor.update(cx, |editor, cx| {
500 let mut block_ids = HashSet::default();
501 block_ids.insert(block_id);
502 editor.remove_blocks(block_ids, None, cx);
503 });
504 }
505 });
506
507 let Ok(editor_block) =
508 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
509 else {
510 return;
511 };
512
513 let new_cursor_pos = if let Some(next_cursor) = next_cell {
514 next_cursor
515 } else {
516 editor_block.invalidation_anchor
517 };
518
519 self.blocks
520 .insert(message.header.msg_id.clone(), editor_block);
521
522 match &self.kernel {
523 Kernel::RunningKernel(_) => {
524 self.send(message, cx).ok();
525 }
526 Kernel::StartingKernel(task) => {
527 // Queue up the execution as a task to run after the kernel starts
528 let task = task.clone();
529 let message = message.clone();
530
531 cx.spawn(|this, mut cx| async move {
532 task.await;
533 this.update(&mut cx, |session, cx| {
534 session.send(message, cx).ok();
535 })
536 .ok();
537 })
538 .detach();
539 }
540 _ => {}
541 }
542
543 // Now move the cursor to after the block
544 editor.update(cx, move |editor, cx| {
545 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
546 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
547 });
548 });
549 }
550
551 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
552 let parent_message_id = match message.parent_header.as_ref() {
553 Some(header) => &header.msg_id,
554 None => return,
555 };
556
557 match &message.content {
558 JupyterMessageContent::Status(status) => {
559 self.kernel.set_execution_state(&status.execution_state);
560
561 self.telemetry.report_repl_event(
562 self.kernel_specification.kernelspec.language.clone(),
563 KernelStatus::from(&self.kernel).to_string(),
564 cx.entity_id().to_string(),
565 );
566
567 cx.notify();
568 }
569 JupyterMessageContent::KernelInfoReply(reply) => {
570 self.kernel.set_kernel_info(&reply);
571 cx.notify();
572 }
573 _ => {}
574 }
575
576 if let Some(block) = self.blocks.get_mut(parent_message_id) {
577 block.handle_message(&message, cx);
578 return;
579 }
580 }
581
582 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
583 match &mut self.kernel {
584 Kernel::RunningKernel(_kernel) => {
585 self.send(InterruptRequest {}.into(), cx).ok();
586 }
587 Kernel::StartingKernel(_task) => {
588 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
589 }
590 _ => {}
591 }
592 }
593
594 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
595 if let Kernel::Shutdown = kernel {
596 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
597 }
598
599 let kernel_status = KernelStatus::from(&kernel).to_string();
600 let kernel_language = self.kernel_specification.kernelspec.language.clone();
601
602 self.telemetry.report_repl_event(
603 kernel_language,
604 kernel_status,
605 cx.entity_id().to_string(),
606 );
607
608 self.kernel = kernel;
609 }
610
611 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
612 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
613
614 match kernel {
615 Kernel::RunningKernel(mut kernel) => {
616 let mut request_tx = kernel.request_tx.clone();
617
618 cx.spawn(|this, mut cx| async move {
619 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
620 request_tx.try_send(message).ok();
621
622 // Give the kernel a bit of time to clean up
623 cx.background_executor().timer(Duration::from_secs(3)).await;
624
625 kernel.process.kill().ok();
626
627 this.update(&mut cx, |session, cx| {
628 session.clear_outputs(cx);
629 session.kernel(Kernel::Shutdown, cx);
630 cx.notify();
631 })
632 .ok();
633 })
634 .detach();
635 }
636 Kernel::StartingKernel(_kernel) => {
637 self.kernel = Kernel::Shutdown;
638 }
639 _ => {
640 self.kernel = Kernel::Shutdown;
641 }
642 }
643 cx.notify();
644 }
645}
646
647pub enum SessionEvent {
648 Shutdown(WeakView<Editor>),
649}
650
651impl EventEmitter<SessionEvent> for Session {}
652
653impl Render for Session {
654 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
655 let (status_text, interrupt_button) = match &self.kernel {
656 Kernel::RunningKernel(kernel) => (
657 kernel
658 .kernel_info
659 .as_ref()
660 .map(|info| info.language_info.name.clone()),
661 Some(
662 Button::new("interrupt", "Interrupt")
663 .style(ButtonStyle::Subtle)
664 .on_click(cx.listener(move |session, _, cx| {
665 session.interrupt(cx);
666 })),
667 ),
668 ),
669 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
670 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
671 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
672 Kernel::Shutdown => (Some("Shutdown".into()), None),
673 };
674
675 KernelListItem::new(self.kernel_specification.clone())
676 .status_color(match &self.kernel {
677 Kernel::RunningKernel(kernel) => match kernel.execution_state {
678 ExecutionState::Idle => Color::Success,
679 ExecutionState::Busy => Color::Modified,
680 },
681 Kernel::StartingKernel(_) => Color::Modified,
682 Kernel::ErroredLaunch(_) => Color::Error,
683 Kernel::ShuttingDown => Color::Modified,
684 Kernel::Shutdown => Color::Disabled,
685 })
686 .child(Label::new(self.kernel_specification.name.clone()))
687 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
688 .button(
689 Button::new("shutdown", "Shutdown")
690 .style(ButtonStyle::Subtle)
691 .disabled(self.kernel.is_shutting_down())
692 .on_click(cx.listener(move |session, _, cx| {
693 session.shutdown(cx);
694 })),
695 )
696 .buttons(interrupt_button)
697 }
698}