1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
21 WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use settings::Settings as _;
30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
31use theme::{ActiveTheme, ThemeSettings};
32use ui::{prelude::*, IconButtonShape, Tooltip};
33
34pub struct Session {
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Task<()>,
39 pub kernel_specification: KernelSpecification,
40 telemetry: Arc<Telemetry>,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: View<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakView<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut ViewContext<Session>,
61 ) -> anyhow::Result<Self> {
62 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
63
64 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
65 let buffer = editor.buffer().clone();
66 let buffer_snapshot = buffer.read(cx).snapshot(cx);
67 let end_point = code_range.end.to_point(&buffer_snapshot);
68 let next_row_start = end_point + Point::new(1, 0);
69 if next_row_start > buffer_snapshot.max_point() {
70 buffer.update(cx, |buffer, cx| {
71 buffer.edit(
72 [(
73 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
74 "\n",
75 )],
76 None,
77 cx,
78 )
79 });
80 }
81
82 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
83 let block = BlockProperties {
84 position: code_range.end,
85 // Take up at least one height for status, allow the editor to determine the real height based on the content from render
86 height: 1,
87 style: BlockStyle::Sticky,
88 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
89 disposition: BlockDisposition::Below,
90 };
91
92 let block_id = editor.insert_blocks([block], None, cx)[0];
93 (block_id, invalidation_anchor)
94 })?;
95
96 anyhow::Ok(Self {
97 code_range,
98 invalidation_anchor,
99 block_id,
100 execution_view,
101 })
102 }
103
104 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
105 self.execution_view.update(cx, |execution_view, cx| {
106 execution_view.push_message(&message.content, cx);
107 });
108 }
109
110 fn create_output_area_renderer(
111 execution_view: View<ExecutionView>,
112 on_close: CloseBlockFn,
113 ) -> RenderBlock {
114 let render = move |cx: &mut BlockContext| {
115 let execution_view = execution_view.clone();
116 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
117 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
118
119 let gutter = cx.gutter_dimensions;
120 let close_button_size = IconSize::XSmall;
121
122 let block_id = cx.block_id;
123 let on_close = on_close.clone();
124
125 let rem_size = cx.rem_size();
126 let line_height = cx.text_style().line_height_in_pixels(rem_size);
127
128 let (close_button_width, close_button_padding) =
129 close_button_size.square_components(cx);
130
131 div()
132 .min_h(line_height)
133 .flex()
134 .flex_row()
135 .items_start()
136 .w_full()
137 .bg(cx.theme().colors().background)
138 .border_y_1()
139 .border_color(cx.theme().colors().border)
140 .child(
141 v_flex().min_h(cx.line_height()).justify_center().child(
142 h_flex()
143 .w(gutter.full_width())
144 .justify_end()
145 .pt(line_height / 2.)
146 .child(
147 h_flex()
148 .pr(gutter.width / 2. - close_button_width
149 + close_button_padding / 2.)
150 .child(
151 IconButton::new(
152 ("close_output_area", EntityId::from(cx.block_id)),
153 IconName::Close,
154 )
155 .shape(IconButtonShape::Square)
156 .icon_size(close_button_size)
157 .icon_color(Color::Muted)
158 .tooltip(|cx| Tooltip::text("Close output area", cx))
159 .on_click(
160 move |_, cx| {
161 if let BlockId::Custom(block_id) = block_id {
162 (on_close)(block_id, cx)
163 }
164 },
165 ),
166 ),
167 ),
168 ),
169 )
170 .child(
171 div()
172 .flex_1()
173 .size_full()
174 .my_2()
175 .mr(gutter.width)
176 .text_size(text_font_size)
177 .font_family(text_font)
178 .child(execution_view),
179 )
180 .into_any_element()
181 };
182
183 Box::new(render)
184 }
185}
186
187impl Session {
188 pub fn new(
189 editor: WeakView<Editor>,
190 fs: Arc<dyn Fs>,
191 telemetry: Arc<Telemetry>,
192 kernel_specification: KernelSpecification,
193 cx: &mut ViewContext<Self>,
194 ) -> Self {
195 let kernel_language = kernel_specification.kernelspec.language.clone();
196
197 telemetry.report_repl_event(
198 kernel_language.clone(),
199 KernelStatus::Starting.to_string(),
200 cx.entity_id().to_string(),
201 );
202
203 let entity_id = editor.entity_id();
204 let working_directory = editor
205 .upgrade()
206 .and_then(|editor| editor.read(cx).working_directory(cx))
207 .unwrap_or_else(temp_dir);
208 let kernel = RunningKernel::new(
209 kernel_specification.clone(),
210 entity_id,
211 working_directory,
212 fs.clone(),
213 cx,
214 );
215
216 let pending_kernel = cx
217 .spawn(|this, mut cx| async move {
218 let kernel = kernel.await;
219
220 match kernel {
221 Ok((mut kernel, mut messages_rx)) => {
222 this.update(&mut cx, |session, cx| {
223 // At this point we can create a new kind of kernel that has the process and our long running background tasks
224
225 let stderr = kernel.process.stderr.take();
226
227 cx.spawn(|_session, mut _cx| async move {
228 if let None = stderr {
229 return;
230 }
231 let reader = BufReader::new(stderr.unwrap());
232 let mut lines = reader.lines();
233 while let Some(Ok(line)) = lines.next().await {
234 log::error!("kernel: {}", line);
235 }
236 })
237 .detach();
238
239 let stdout = kernel.process.stderr.take();
240
241 cx.spawn(|_session, mut _cx| async move {
242 if let None = stdout {
243 return;
244 }
245 let reader = BufReader::new(stdout.unwrap());
246 let mut lines = reader.lines();
247 while let Some(Ok(line)) = lines.next().await {
248 log::info!("kernel: {}", line);
249 }
250 })
251 .detach();
252
253 let status = kernel.process.status();
254 session.kernel(Kernel::RunningKernel(kernel), cx);
255
256 cx.spawn(|session, mut cx| async move {
257 let error_message = match status.await {
258 Ok(status) => {
259 if status.success() {
260 log::info!("kernel process exited successfully");
261 return;
262 }
263
264 format!("kernel process exited with status: {:?}", status)
265 }
266 Err(err) => {
267 format!("kernel process exited with error: {:?}", err)
268 }
269 };
270
271 log::error!("{}", error_message);
272
273 session
274 .update(&mut cx, |session, cx| {
275 session.kernel(
276 Kernel::ErroredLaunch(error_message.clone()),
277 cx,
278 );
279
280 session.blocks.values().for_each(|block| {
281 block.execution_view.update(
282 cx,
283 |execution_view, cx| {
284 match execution_view.status {
285 ExecutionStatus::Finished => {
286 // Do nothing when the output was good
287 }
288 _ => {
289 // All other cases, set the status to errored
290 execution_view.status =
291 ExecutionStatus::KernelErrored(
292 error_message.clone(),
293 )
294 }
295 }
296 cx.notify();
297 },
298 );
299 });
300
301 cx.notify();
302 })
303 .ok();
304 })
305 .detach();
306
307 session.messaging_task = cx.spawn(|session, mut cx| async move {
308 while let Some(message) = messages_rx.next().await {
309 session
310 .update(&mut cx, |session, cx| {
311 session.route(&message, cx);
312 })
313 .ok();
314 }
315 });
316
317 // todo!(kyle): send kernelinforequest once our shell channel read/writes are split
318 // cx.spawn(|this, mut cx| async move {
319 // cx.background_executor()
320 // .timer(Duration::from_millis(120))
321 // .await;
322 // this.update(&mut cx, |this, cx| {
323 // this.send(KernelInfoRequest {}.into(), cx).ok();
324 // })
325 // .ok();
326 // })
327 // .detach();
328 })
329 .ok();
330 }
331 Err(err) => {
332 this.update(&mut cx, |session, cx| {
333 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
334 })
335 .ok();
336 }
337 }
338 })
339 .shared();
340
341 let subscription = match editor.upgrade() {
342 Some(editor) => {
343 let buffer = editor.read(cx).buffer().clone();
344 cx.subscribe(&buffer, Self::on_buffer_event)
345 }
346 None => Subscription::new(|| {}),
347 };
348
349 return Self {
350 editor,
351 kernel: Kernel::StartingKernel(pending_kernel),
352 messaging_task: Task::ready(()),
353 blocks: HashMap::default(),
354 kernel_specification,
355 _buffer_subscription: subscription,
356 telemetry,
357 };
358 }
359
360 fn on_buffer_event(
361 &mut self,
362 buffer: Model<MultiBuffer>,
363 event: &multi_buffer::Event,
364 cx: &mut ViewContext<Self>,
365 ) {
366 if let multi_buffer::Event::Edited { .. } = event {
367 let snapshot = buffer.read(cx).snapshot(cx);
368
369 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
370
371 self.blocks.retain(|_id, block| {
372 if block.invalidation_anchor.is_valid(&snapshot) {
373 true
374 } else {
375 blocks_to_remove.insert(block.block_id);
376 false
377 }
378 });
379
380 if !blocks_to_remove.is_empty() {
381 self.editor
382 .update(cx, |editor, cx| {
383 editor.remove_blocks(blocks_to_remove, None, cx);
384 })
385 .ok();
386 cx.notify();
387 }
388 }
389 }
390
391 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
392 match &mut self.kernel {
393 Kernel::RunningKernel(kernel) => {
394 kernel.request_tx.try_send(message).ok();
395 }
396 _ => {}
397 }
398
399 anyhow::Ok(())
400 }
401
402 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
403 let blocks_to_remove: HashSet<CustomBlockId> =
404 self.blocks.values().map(|block| block.block_id).collect();
405
406 self.editor
407 .update(cx, |editor, cx| {
408 editor.remove_blocks(blocks_to_remove, None, cx);
409 })
410 .ok();
411
412 self.blocks.clear();
413 }
414
415 pub fn execute(
416 &mut self,
417 code: String,
418 anchor_range: Range<Anchor>,
419 next_cell: Option<Anchor>,
420 cx: &mut ViewContext<Self>,
421 ) {
422 let Some(editor) = self.editor.upgrade() else {
423 return;
424 };
425
426 if code.is_empty() {
427 return;
428 }
429
430 let execute_request = ExecuteRequest {
431 code,
432 ..ExecuteRequest::default()
433 };
434
435 let message: JupyterMessage = execute_request.into();
436
437 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
438
439 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
440
441 self.blocks.retain(|_key, block| {
442 if anchor_range.overlaps(&block.code_range, &buffer) {
443 blocks_to_remove.insert(block.block_id);
444 false
445 } else {
446 true
447 }
448 });
449
450 self.editor
451 .update(cx, |editor, cx| {
452 editor.remove_blocks(blocks_to_remove, None, cx);
453 })
454 .ok();
455
456 let status = match &self.kernel {
457 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
458 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
459 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
460 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
461 Kernel::Shutdown => ExecutionStatus::Shutdown,
462 };
463
464 let parent_message_id = message.header.msg_id.clone();
465 let session_view = cx.view().downgrade();
466 let weak_editor = self.editor.clone();
467
468 let on_close: CloseBlockFn =
469 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
470 if let Some(session) = session_view.upgrade() {
471 session.update(cx, |session, cx| {
472 session.blocks.remove(&parent_message_id);
473 cx.notify();
474 });
475 }
476
477 if let Some(editor) = weak_editor.upgrade() {
478 editor.update(cx, |editor, cx| {
479 let mut block_ids = HashSet::default();
480 block_ids.insert(block_id);
481 editor.remove_blocks(block_ids, None, cx);
482 });
483 }
484 });
485
486 let Ok(editor_block) =
487 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
488 else {
489 return;
490 };
491
492 let new_cursor_pos = if let Some(next_cursor) = next_cell {
493 next_cursor
494 } else {
495 editor_block.invalidation_anchor
496 };
497
498 self.blocks
499 .insert(message.header.msg_id.clone(), editor_block);
500
501 match &self.kernel {
502 Kernel::RunningKernel(_) => {
503 self.send(message, cx).ok();
504 }
505 Kernel::StartingKernel(task) => {
506 // Queue up the execution as a task to run after the kernel starts
507 let task = task.clone();
508 let message = message.clone();
509
510 cx.spawn(|this, mut cx| async move {
511 task.await;
512 this.update(&mut cx, |session, cx| {
513 session.send(message, cx).ok();
514 })
515 .ok();
516 })
517 .detach();
518 }
519 _ => {}
520 }
521
522 // Now move the cursor to after the block
523 editor.update(cx, move |editor, cx| {
524 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
525 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
526 });
527 });
528 }
529
530 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
531 let parent_message_id = match message.parent_header.as_ref() {
532 Some(header) => &header.msg_id,
533 None => return,
534 };
535
536 match &message.content {
537 JupyterMessageContent::Status(status) => {
538 self.kernel.set_execution_state(&status.execution_state);
539
540 self.telemetry.report_repl_event(
541 self.kernel_specification.kernelspec.language.clone(),
542 KernelStatus::from(&self.kernel).to_string(),
543 cx.entity_id().to_string(),
544 );
545
546 cx.notify();
547 }
548 JupyterMessageContent::KernelInfoReply(reply) => {
549 self.kernel.set_kernel_info(&reply);
550 cx.notify();
551 }
552 JupyterMessageContent::UpdateDisplayData(update) => {
553 let display_id = if let Some(display_id) = update.transient.display_id.clone() {
554 display_id
555 } else {
556 return;
557 };
558
559 self.blocks.iter_mut().for_each(|(_, block)| {
560 block.execution_view.update(cx, |execution_view, cx| {
561 execution_view.update_display_data(&update.data, &display_id, cx);
562 });
563 });
564 return;
565 }
566 _ => {}
567 }
568
569 if let Some(block) = self.blocks.get_mut(parent_message_id) {
570 block.handle_message(&message, cx);
571 return;
572 }
573 }
574
575 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
576 match &mut self.kernel {
577 Kernel::RunningKernel(_kernel) => {
578 self.send(InterruptRequest {}.into(), cx).ok();
579 }
580 Kernel::StartingKernel(_task) => {
581 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
582 }
583 _ => {}
584 }
585 }
586
587 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
588 if let Kernel::Shutdown = kernel {
589 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
590 }
591
592 let kernel_status = KernelStatus::from(&kernel).to_string();
593 let kernel_language = self.kernel_specification.kernelspec.language.clone();
594
595 self.telemetry.report_repl_event(
596 kernel_language,
597 kernel_status,
598 cx.entity_id().to_string(),
599 );
600
601 self.kernel = kernel;
602 }
603
604 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
605 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
606
607 match kernel {
608 Kernel::RunningKernel(mut kernel) => {
609 let mut request_tx = kernel.request_tx.clone();
610
611 cx.spawn(|this, mut cx| async move {
612 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
613 request_tx.try_send(message).ok();
614
615 // Give the kernel a bit of time to clean up
616 cx.background_executor().timer(Duration::from_secs(3)).await;
617
618 kernel.process.kill().ok();
619
620 this.update(&mut cx, |session, cx| {
621 session.clear_outputs(cx);
622 session.kernel(Kernel::Shutdown, cx);
623 cx.notify();
624 })
625 .ok();
626 })
627 .detach();
628 }
629 Kernel::StartingKernel(_kernel) => {
630 self.kernel = Kernel::Shutdown;
631 }
632 _ => {
633 self.kernel = Kernel::Shutdown;
634 }
635 }
636 cx.notify();
637 }
638}
639
640pub enum SessionEvent {
641 Shutdown(WeakView<Editor>),
642}
643
644impl EventEmitter<SessionEvent> for Session {}
645
646impl Render for Session {
647 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
648 let (status_text, interrupt_button) = match &self.kernel {
649 Kernel::RunningKernel(kernel) => (
650 kernel
651 .kernel_info
652 .as_ref()
653 .map(|info| info.language_info.name.clone()),
654 Some(
655 Button::new("interrupt", "Interrupt")
656 .style(ButtonStyle::Subtle)
657 .on_click(cx.listener(move |session, _, cx| {
658 session.interrupt(cx);
659 })),
660 ),
661 ),
662 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
663 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
664 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
665 Kernel::Shutdown => (Some("Shutdown".into()), None),
666 };
667
668 KernelListItem::new(self.kernel_specification.clone())
669 .status_color(match &self.kernel {
670 Kernel::RunningKernel(kernel) => match kernel.execution_state {
671 ExecutionState::Idle => Color::Success,
672 ExecutionState::Busy => Color::Modified,
673 },
674 Kernel::StartingKernel(_) => Color::Modified,
675 Kernel::ErroredLaunch(_) => Color::Error,
676 Kernel::ShuttingDown => Color::Modified,
677 Kernel::Shutdown => Color::Disabled,
678 })
679 .child(Label::new(self.kernel_specification.name.clone()))
680 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
681 .button(
682 Button::new("shutdown", "Shutdown")
683 .style(ButtonStyle::Subtle)
684 .disabled(self.kernel.is_shutting_down())
685 .on_click(cx.listener(move |session, _, cx| {
686 session.shutdown(cx);
687 })),
688 )
689 .buttons(interrupt_button)
690 }
691}