1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::{FutureExt as _, StreamExt as _};
18use gpui::{
19 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
20 WeakView,
21};
22use language::Point;
23use project::Fs;
24use runtimelib::{
25 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
26 ShutdownRequest,
27};
28use settings::Settings as _;
29use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
30use theme::{ActiveTheme, ThemeSettings};
31use ui::{prelude::*, IconButtonShape, Tooltip};
32
33pub struct Session {
34 editor: WeakView<Editor>,
35 pub kernel: Kernel,
36 blocks: HashMap<String, EditorBlock>,
37 messaging_task: Task<()>,
38 pub kernel_specification: KernelSpecification,
39 telemetry: Arc<Telemetry>,
40 _buffer_subscription: Subscription,
41}
42
43struct EditorBlock {
44 editor: WeakView<Editor>,
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: View<ExecutionView>,
49 on_close: CloseBlockFn,
50}
51
52type CloseBlockFn =
53 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
54
55impl EditorBlock {
56 fn new(
57 editor: WeakView<Editor>,
58 code_range: Range<Anchor>,
59 status: ExecutionStatus,
60 on_close: CloseBlockFn,
61 cx: &mut ViewContext<Session>,
62 ) -> anyhow::Result<Self> {
63 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
64
65 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
66 let buffer = editor.buffer().clone();
67 let buffer_snapshot = buffer.read(cx).snapshot(cx);
68 let end_point = code_range.end.to_point(&buffer_snapshot);
69 let next_row_start = end_point + Point::new(1, 0);
70 if next_row_start > buffer_snapshot.max_point() {
71 buffer.update(cx, |buffer, cx| {
72 buffer.edit(
73 [(
74 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
75 "\n",
76 )],
77 None,
78 cx,
79 )
80 });
81 }
82
83 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
84 let block = BlockProperties {
85 position: code_range.end,
86 height: execution_view.num_lines(cx).saturating_add(1),
87 style: BlockStyle::Sticky,
88 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
89 disposition: BlockDisposition::Below,
90 };
91
92 let block_id = editor.insert_blocks([block], None, cx)[0];
93 (block_id, invalidation_anchor)
94 })?;
95
96 anyhow::Ok(Self {
97 editor,
98 code_range,
99 invalidation_anchor,
100 block_id,
101 execution_view,
102 on_close,
103 })
104 }
105
106 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
107 self.execution_view.update(cx, |execution_view, cx| {
108 execution_view.push_message(&message.content, cx);
109 });
110
111 self.editor
112 .update(cx, |editor, cx| {
113 let mut replacements = HashMap::default();
114
115 replacements.insert(
116 self.block_id,
117 (
118 Some(self.execution_view.num_lines(cx).saturating_add(1)),
119 Self::create_output_area_renderer(
120 self.execution_view.clone(),
121 self.on_close.clone(),
122 ),
123 ),
124 );
125 editor.replace_blocks(replacements, None, cx);
126 })
127 .ok();
128 }
129
130 fn create_output_area_renderer(
131 execution_view: View<ExecutionView>,
132 on_close: CloseBlockFn,
133 ) -> RenderBlock {
134 let render = move |cx: &mut BlockContext| {
135 let execution_view = execution_view.clone();
136 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
137 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
138
139 let gutter = cx.gutter_dimensions;
140 let close_button_size = IconSize::XSmall;
141
142 let block_id = cx.block_id;
143 let on_close = on_close.clone();
144
145 let rem_size = cx.rem_size();
146 let line_height = cx.text_style().line_height_in_pixels(rem_size);
147
148 let (close_button_width, close_button_padding) =
149 close_button_size.square_components(cx);
150
151 div()
152 .min_h(line_height)
153 .flex()
154 .flex_row()
155 .items_start()
156 .w_full()
157 .bg(cx.theme().colors().background)
158 .border_y_1()
159 .border_color(cx.theme().colors().border)
160 .child(
161 v_flex().min_h(cx.line_height()).justify_center().child(
162 h_flex()
163 .w(gutter.full_width())
164 .justify_end()
165 .pt(line_height / 2.)
166 .child(
167 h_flex()
168 .pr(gutter.width / 2. - close_button_width
169 + close_button_padding / 2.)
170 .child(
171 IconButton::new(
172 ("close_output_area", EntityId::from(cx.block_id)),
173 IconName::Close,
174 )
175 .shape(IconButtonShape::Square)
176 .icon_size(close_button_size)
177 .icon_color(Color::Muted)
178 .tooltip(|cx| Tooltip::text("Close output area", cx))
179 .on_click(
180 move |_, cx| {
181 if let BlockId::Custom(block_id) = block_id {
182 (on_close)(block_id, cx)
183 }
184 },
185 ),
186 ),
187 ),
188 ),
189 )
190 .child(
191 div()
192 .flex_1()
193 .size_full()
194 .my_2()
195 .mr(gutter.width)
196 .text_size(text_font_size)
197 .font_family(text_font)
198 .child(execution_view),
199 )
200 .into_any_element()
201 };
202
203 Box::new(render)
204 }
205}
206
207impl Session {
208 pub fn new(
209 editor: WeakView<Editor>,
210 fs: Arc<dyn Fs>,
211 telemetry: Arc<Telemetry>,
212 kernel_specification: KernelSpecification,
213 cx: &mut ViewContext<Self>,
214 ) -> Self {
215 let kernel_language = kernel_specification.kernelspec.language.clone();
216
217 telemetry.report_repl_event(
218 kernel_language.clone(),
219 KernelStatus::Starting.to_string(),
220 cx.entity_id().to_string(),
221 );
222
223 let entity_id = editor.entity_id();
224 let working_directory = editor
225 .upgrade()
226 .and_then(|editor| editor.read(cx).working_directory(cx))
227 .unwrap_or_else(temp_dir);
228 let kernel = RunningKernel::new(
229 kernel_specification.clone(),
230 entity_id,
231 working_directory,
232 fs.clone(),
233 cx,
234 );
235
236 let pending_kernel = cx
237 .spawn(|this, mut cx| async move {
238 let kernel = kernel.await;
239
240 match kernel {
241 Ok((mut kernel, mut messages_rx)) => {
242 this.update(&mut cx, |session, cx| {
243 // At this point we can create a new kind of kernel that has the process and our long running background tasks
244
245 let status = kernel.process.status();
246 session.kernel(Kernel::RunningKernel(kernel), cx);
247
248 cx.spawn(|session, mut cx| async move {
249 let error_message = match status.await {
250 Ok(status) => {
251 if status.success() {
252 log::info!("kernel process exited successfully");
253 return;
254 }
255
256 format!("kernel process exited with status: {:?}", status)
257 }
258 Err(err) => {
259 format!("kernel process exited with error: {:?}", err)
260 }
261 };
262
263 log::error!("{}", error_message);
264
265 session
266 .update(&mut cx, |session, cx| {
267 session.kernel(
268 Kernel::ErroredLaunch(error_message.clone()),
269 cx,
270 );
271
272 session.blocks.values().for_each(|block| {
273 block.execution_view.update(
274 cx,
275 |execution_view, cx| {
276 match execution_view.status {
277 ExecutionStatus::Finished => {
278 // Do nothing when the output was good
279 }
280 _ => {
281 // All other cases, set the status to errored
282 execution_view.status =
283 ExecutionStatus::KernelErrored(
284 error_message.clone(),
285 )
286 }
287 }
288 cx.notify();
289 },
290 );
291 });
292
293 cx.notify();
294 })
295 .ok();
296 })
297 .detach();
298
299 session.messaging_task = cx.spawn(|session, mut cx| async move {
300 while let Some(message) = messages_rx.next().await {
301 session
302 .update(&mut cx, |session, cx| {
303 session.route(&message, cx);
304 })
305 .ok();
306 }
307 });
308
309 // todo!(kyle): send kernelinforequest once our shell channel read/writes are split
310 // cx.spawn(|this, mut cx| async move {
311 // cx.background_executor()
312 // .timer(Duration::from_millis(120))
313 // .await;
314 // this.update(&mut cx, |this, cx| {
315 // this.send(KernelInfoRequest {}.into(), cx).ok();
316 // })
317 // .ok();
318 // })
319 // .detach();
320 })
321 .ok();
322 }
323 Err(err) => {
324 this.update(&mut cx, |session, cx| {
325 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
326 })
327 .ok();
328 }
329 }
330 })
331 .shared();
332
333 let subscription = match editor.upgrade() {
334 Some(editor) => {
335 let buffer = editor.read(cx).buffer().clone();
336 cx.subscribe(&buffer, Self::on_buffer_event)
337 }
338 None => Subscription::new(|| {}),
339 };
340
341 return Self {
342 editor,
343 kernel: Kernel::StartingKernel(pending_kernel),
344 messaging_task: Task::ready(()),
345 blocks: HashMap::default(),
346 kernel_specification,
347 _buffer_subscription: subscription,
348 telemetry,
349 };
350 }
351
352 fn on_buffer_event(
353 &mut self,
354 buffer: Model<MultiBuffer>,
355 event: &multi_buffer::Event,
356 cx: &mut ViewContext<Self>,
357 ) {
358 if let multi_buffer::Event::Edited { .. } = event {
359 let snapshot = buffer.read(cx).snapshot(cx);
360
361 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
362
363 self.blocks.retain(|_id, block| {
364 if block.invalidation_anchor.is_valid(&snapshot) {
365 true
366 } else {
367 blocks_to_remove.insert(block.block_id);
368 false
369 }
370 });
371
372 if !blocks_to_remove.is_empty() {
373 self.editor
374 .update(cx, |editor, cx| {
375 editor.remove_blocks(blocks_to_remove, None, cx);
376 })
377 .ok();
378 cx.notify();
379 }
380 }
381 }
382
383 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
384 match &mut self.kernel {
385 Kernel::RunningKernel(kernel) => {
386 kernel.request_tx.try_send(message).ok();
387 }
388 _ => {}
389 }
390
391 anyhow::Ok(())
392 }
393
394 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
395 let blocks_to_remove: HashSet<CustomBlockId> =
396 self.blocks.values().map(|block| block.block_id).collect();
397
398 self.editor
399 .update(cx, |editor, cx| {
400 editor.remove_blocks(blocks_to_remove, None, cx);
401 })
402 .ok();
403
404 self.blocks.clear();
405 }
406
407 pub fn execute(
408 &mut self,
409 code: String,
410 anchor_range: Range<Anchor>,
411 next_cell: Option<Anchor>,
412 cx: &mut ViewContext<Self>,
413 ) {
414 let Some(editor) = self.editor.upgrade() else {
415 return;
416 };
417
418 if code.is_empty() {
419 return;
420 }
421
422 let execute_request = ExecuteRequest {
423 code,
424 ..ExecuteRequest::default()
425 };
426
427 let message: JupyterMessage = execute_request.into();
428
429 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
430
431 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
432
433 self.blocks.retain(|_key, block| {
434 if anchor_range.overlaps(&block.code_range, &buffer) {
435 blocks_to_remove.insert(block.block_id);
436 false
437 } else {
438 true
439 }
440 });
441
442 self.editor
443 .update(cx, |editor, cx| {
444 editor.remove_blocks(blocks_to_remove, None, cx);
445 })
446 .ok();
447
448 let status = match &self.kernel {
449 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
450 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
451 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
452 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
453 Kernel::Shutdown => ExecutionStatus::Shutdown,
454 };
455
456 let parent_message_id = message.header.msg_id.clone();
457 let session_view = cx.view().downgrade();
458 let weak_editor = self.editor.clone();
459
460 let on_close: CloseBlockFn =
461 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
462 if let Some(session) = session_view.upgrade() {
463 session.update(cx, |session, cx| {
464 session.blocks.remove(&parent_message_id);
465 cx.notify();
466 });
467 }
468
469 if let Some(editor) = weak_editor.upgrade() {
470 editor.update(cx, |editor, cx| {
471 let mut block_ids = HashSet::default();
472 block_ids.insert(block_id);
473 editor.remove_blocks(block_ids, None, cx);
474 });
475 }
476 });
477
478 let Ok(editor_block) =
479 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
480 else {
481 return;
482 };
483
484 let new_cursor_pos = if let Some(next_cursor) = next_cell {
485 next_cursor
486 } else {
487 editor_block.invalidation_anchor
488 };
489
490 self.blocks
491 .insert(message.header.msg_id.clone(), editor_block);
492
493 match &self.kernel {
494 Kernel::RunningKernel(_) => {
495 self.send(message, cx).ok();
496 }
497 Kernel::StartingKernel(task) => {
498 // Queue up the execution as a task to run after the kernel starts
499 let task = task.clone();
500 let message = message.clone();
501
502 cx.spawn(|this, mut cx| async move {
503 task.await;
504 this.update(&mut cx, |session, cx| {
505 session.send(message, cx).ok();
506 })
507 .ok();
508 })
509 .detach();
510 }
511 _ => {}
512 }
513
514 // Now move the cursor to after the block
515 editor.update(cx, move |editor, cx| {
516 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
517 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
518 });
519 });
520 }
521
522 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
523 let parent_message_id = match message.parent_header.as_ref() {
524 Some(header) => &header.msg_id,
525 None => return,
526 };
527
528 match &message.content {
529 JupyterMessageContent::Status(status) => {
530 self.kernel.set_execution_state(&status.execution_state);
531
532 self.telemetry.report_repl_event(
533 self.kernel_specification.kernelspec.language.clone(),
534 KernelStatus::from(&self.kernel).to_string(),
535 cx.entity_id().to_string(),
536 );
537
538 cx.notify();
539 }
540 JupyterMessageContent::KernelInfoReply(reply) => {
541 self.kernel.set_kernel_info(&reply);
542 cx.notify();
543 }
544 _ => {}
545 }
546
547 if let Some(block) = self.blocks.get_mut(parent_message_id) {
548 block.handle_message(&message, cx);
549 return;
550 }
551 }
552
553 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
554 match &mut self.kernel {
555 Kernel::RunningKernel(_kernel) => {
556 self.send(InterruptRequest {}.into(), cx).ok();
557 }
558 Kernel::StartingKernel(_task) => {
559 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
560 }
561 _ => {}
562 }
563 }
564
565 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
566 if let Kernel::Shutdown = kernel {
567 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
568 }
569
570 let kernel_status = KernelStatus::from(&kernel).to_string();
571 let kernel_language = self.kernel_specification.kernelspec.language.clone();
572
573 self.telemetry.report_repl_event(
574 kernel_language,
575 kernel_status,
576 cx.entity_id().to_string(),
577 );
578
579 self.kernel = kernel;
580 }
581
582 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
583 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
584
585 match kernel {
586 Kernel::RunningKernel(mut kernel) => {
587 let mut request_tx = kernel.request_tx.clone();
588
589 cx.spawn(|this, mut cx| async move {
590 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
591 request_tx.try_send(message).ok();
592
593 // Give the kernel a bit of time to clean up
594 cx.background_executor().timer(Duration::from_secs(3)).await;
595
596 kernel.process.kill().ok();
597
598 this.update(&mut cx, |session, cx| {
599 session.clear_outputs(cx);
600 session.kernel(Kernel::Shutdown, cx);
601 cx.notify();
602 })
603 .ok();
604 })
605 .detach();
606 }
607 Kernel::StartingKernel(_kernel) => {
608 self.kernel = Kernel::Shutdown;
609 }
610 _ => {
611 self.kernel = Kernel::Shutdown;
612 }
613 }
614 cx.notify();
615 }
616}
617
618pub enum SessionEvent {
619 Shutdown(WeakView<Editor>),
620}
621
622impl EventEmitter<SessionEvent> for Session {}
623
624impl Render for Session {
625 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
626 let (status_text, interrupt_button) = match &self.kernel {
627 Kernel::RunningKernel(kernel) => (
628 kernel
629 .kernel_info
630 .as_ref()
631 .map(|info| info.language_info.name.clone()),
632 Some(
633 Button::new("interrupt", "Interrupt")
634 .style(ButtonStyle::Subtle)
635 .on_click(cx.listener(move |session, _, cx| {
636 session.interrupt(cx);
637 })),
638 ),
639 ),
640 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
641 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
642 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
643 Kernel::Shutdown => (Some("Shutdown".into()), None),
644 };
645
646 KernelListItem::new(self.kernel_specification.clone())
647 .status_color(match &self.kernel {
648 Kernel::RunningKernel(kernel) => match kernel.execution_state {
649 ExecutionState::Idle => Color::Success,
650 ExecutionState::Busy => Color::Modified,
651 },
652 Kernel::StartingKernel(_) => Color::Modified,
653 Kernel::ErroredLaunch(_) => Color::Error,
654 Kernel::ShuttingDown => Color::Modified,
655 Kernel::Shutdown => Color::Disabled,
656 })
657 .child(Label::new(self.kernel_specification.name.clone()))
658 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
659 .button(
660 Button::new("shutdown", "Shutdown")
661 .style(ButtonStyle::Subtle)
662 .disabled(self.kernel.is_shutting_down())
663 .on_click(cx.listener(move |session, _, cx| {
664 session.shutdown(cx);
665 })),
666 )
667 .buttons(interrupt_button)
668 }
669}