1use crate::components::KernelListItem;
2use crate::KernelStatus;
3use crate::{
4 kernels::{Kernel, KernelSpecification, RunningKernel},
5 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
6};
7use client::telemetry::Telemetry;
8use collections::{HashMap, HashSet};
9use editor::{
10 display_map::{
11 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
12 RenderBlock,
13 },
14 scroll::Autoscroll,
15 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
16};
17use futures::io::BufReader;
18use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
19use gpui::{
20 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
21 WeakView,
22};
23use language::Point;
24use project::Fs;
25use runtimelib::{
26 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
27 ShutdownRequest,
28};
29use settings::Settings as _;
30use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
31use theme::{ActiveTheme, ThemeSettings};
32use ui::{prelude::*, IconButtonShape, Tooltip};
33
34pub struct Session {
35 editor: WeakView<Editor>,
36 pub kernel: Kernel,
37 blocks: HashMap<String, EditorBlock>,
38 messaging_task: Task<()>,
39 pub kernel_specification: KernelSpecification,
40 telemetry: Arc<Telemetry>,
41 _buffer_subscription: Subscription,
42}
43
44struct EditorBlock {
45 code_range: Range<Anchor>,
46 invalidation_anchor: Anchor,
47 block_id: CustomBlockId,
48 execution_view: View<ExecutionView>,
49}
50
51type CloseBlockFn =
52 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
53
54impl EditorBlock {
55 fn new(
56 editor: WeakView<Editor>,
57 code_range: Range<Anchor>,
58 status: ExecutionStatus,
59 on_close: CloseBlockFn,
60 cx: &mut ViewContext<Session>,
61 ) -> anyhow::Result<Self> {
62 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
63
64 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
65 let buffer = editor.buffer().clone();
66 let buffer_snapshot = buffer.read(cx).snapshot(cx);
67 let end_point = code_range.end.to_point(&buffer_snapshot);
68 let next_row_start = end_point + Point::new(1, 0);
69 if next_row_start > buffer_snapshot.max_point() {
70 buffer.update(cx, |buffer, cx| {
71 buffer.edit(
72 [(
73 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
74 "\n",
75 )],
76 None,
77 cx,
78 )
79 });
80 }
81
82 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
83 let block = BlockProperties {
84 position: code_range.end,
85 height: (execution_view.num_lines(cx) + 1) as u32,
86 style: BlockStyle::Sticky,
87 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
88 disposition: BlockDisposition::Below,
89 };
90
91 let block_id = editor.insert_blocks([block], None, cx)[0];
92 (block_id, invalidation_anchor)
93 })?;
94
95 anyhow::Ok(Self {
96 code_range,
97 invalidation_anchor,
98 block_id,
99 execution_view,
100 })
101 }
102
103 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
104 self.execution_view.update(cx, |execution_view, cx| {
105 execution_view.push_message(&message.content, cx);
106 });
107 }
108
109 fn create_output_area_renderer(
110 execution_view: View<ExecutionView>,
111 on_close: CloseBlockFn,
112 ) -> RenderBlock {
113 let render = move |cx: &mut BlockContext| {
114 let execution_view = execution_view.clone();
115 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
116 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
117
118 let gutter = cx.gutter_dimensions;
119 let close_button_size = IconSize::XSmall;
120
121 let block_id = cx.block_id;
122 let on_close = on_close.clone();
123
124 let rem_size = cx.rem_size();
125 let line_height = cx.text_style().line_height_in_pixels(rem_size);
126
127 let (close_button_width, close_button_padding) =
128 close_button_size.square_components(cx);
129
130 div()
131 .min_h(line_height)
132 .flex()
133 .flex_row()
134 .items_start()
135 .w_full()
136 .bg(cx.theme().colors().background)
137 .border_y_1()
138 .border_color(cx.theme().colors().border)
139 .child(
140 v_flex().min_h(cx.line_height()).justify_center().child(
141 h_flex()
142 .w(gutter.full_width())
143 .justify_end()
144 .pt(line_height / 2.)
145 .child(
146 h_flex()
147 .pr(gutter.width / 2. - close_button_width
148 + close_button_padding / 2.)
149 .child(
150 IconButton::new(
151 ("close_output_area", EntityId::from(cx.block_id)),
152 IconName::Close,
153 )
154 .shape(IconButtonShape::Square)
155 .icon_size(close_button_size)
156 .icon_color(Color::Muted)
157 .tooltip(|cx| Tooltip::text("Close output area", cx))
158 .on_click(
159 move |_, cx| {
160 if let BlockId::Custom(block_id) = block_id {
161 (on_close)(block_id, cx)
162 }
163 },
164 ),
165 ),
166 ),
167 ),
168 )
169 .child(
170 div()
171 .flex_1()
172 .size_full()
173 .my_2()
174 .mr(gutter.width)
175 .text_size(text_font_size)
176 .font_family(text_font)
177 .child(execution_view),
178 )
179 .into_any_element()
180 };
181
182 Box::new(render)
183 }
184}
185
186impl Session {
187 pub fn new(
188 editor: WeakView<Editor>,
189 fs: Arc<dyn Fs>,
190 telemetry: Arc<Telemetry>,
191 kernel_specification: KernelSpecification,
192 cx: &mut ViewContext<Self>,
193 ) -> Self {
194 let kernel_language = kernel_specification.kernelspec.language.clone();
195
196 telemetry.report_repl_event(
197 kernel_language.clone(),
198 KernelStatus::Starting.to_string(),
199 cx.entity_id().to_string(),
200 );
201
202 let entity_id = editor.entity_id();
203 let working_directory = editor
204 .upgrade()
205 .and_then(|editor| editor.read(cx).working_directory(cx))
206 .unwrap_or_else(temp_dir);
207 let kernel = RunningKernel::new(
208 kernel_specification.clone(),
209 entity_id,
210 working_directory,
211 fs.clone(),
212 cx,
213 );
214
215 let pending_kernel = cx
216 .spawn(|this, mut cx| async move {
217 let kernel = kernel.await;
218
219 match kernel {
220 Ok((mut kernel, mut messages_rx)) => {
221 this.update(&mut cx, |session, cx| {
222 // At this point we can create a new kind of kernel that has the process and our long running background tasks
223
224 let stderr = kernel.process.stderr.take();
225
226 cx.spawn(|_session, mut _cx| async move {
227 if let None = stderr {
228 return;
229 }
230 let reader = BufReader::new(stderr.unwrap());
231 let mut lines = reader.lines();
232 while let Some(Ok(line)) = lines.next().await {
233 log::error!("kernel: {}", line);
234 }
235 })
236 .detach();
237
238 let stdout = kernel.process.stderr.take();
239
240 cx.spawn(|_session, mut _cx| async move {
241 if let None = stdout {
242 return;
243 }
244 let reader = BufReader::new(stdout.unwrap());
245 let mut lines = reader.lines();
246 while let Some(Ok(line)) = lines.next().await {
247 log::info!("kernel: {}", line);
248 }
249 })
250 .detach();
251
252 let status = kernel.process.status();
253 session.kernel(Kernel::RunningKernel(kernel), cx);
254
255 cx.spawn(|session, mut cx| async move {
256 let error_message = match status.await {
257 Ok(status) => {
258 if status.success() {
259 log::info!("kernel process exited successfully");
260 return;
261 }
262
263 format!("kernel process exited with status: {:?}", status)
264 }
265 Err(err) => {
266 format!("kernel process exited with error: {:?}", err)
267 }
268 };
269
270 log::error!("{}", error_message);
271
272 session
273 .update(&mut cx, |session, cx| {
274 session.kernel(
275 Kernel::ErroredLaunch(error_message.clone()),
276 cx,
277 );
278
279 session.blocks.values().for_each(|block| {
280 block.execution_view.update(
281 cx,
282 |execution_view, cx| {
283 match execution_view.status {
284 ExecutionStatus::Finished => {
285 // Do nothing when the output was good
286 }
287 _ => {
288 // All other cases, set the status to errored
289 execution_view.status =
290 ExecutionStatus::KernelErrored(
291 error_message.clone(),
292 )
293 }
294 }
295 cx.notify();
296 },
297 );
298 });
299
300 cx.notify();
301 })
302 .ok();
303 })
304 .detach();
305
306 session.messaging_task = cx.spawn(|session, mut cx| async move {
307 while let Some(message) = messages_rx.next().await {
308 session
309 .update(&mut cx, |session, cx| {
310 session.route(&message, cx);
311 })
312 .ok();
313 }
314 });
315
316 // todo!(kyle): send kernelinforequest once our shell channel read/writes are split
317 // cx.spawn(|this, mut cx| async move {
318 // cx.background_executor()
319 // .timer(Duration::from_millis(120))
320 // .await;
321 // this.update(&mut cx, |this, cx| {
322 // this.send(KernelInfoRequest {}.into(), cx).ok();
323 // })
324 // .ok();
325 // })
326 // .detach();
327 })
328 .ok();
329 }
330 Err(err) => {
331 this.update(&mut cx, |session, cx| {
332 session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
333 })
334 .ok();
335 }
336 }
337 })
338 .shared();
339
340 let subscription = match editor.upgrade() {
341 Some(editor) => {
342 let buffer = editor.read(cx).buffer().clone();
343 cx.subscribe(&buffer, Self::on_buffer_event)
344 }
345 None => Subscription::new(|| {}),
346 };
347
348 return Self {
349 editor,
350 kernel: Kernel::StartingKernel(pending_kernel),
351 messaging_task: Task::ready(()),
352 blocks: HashMap::default(),
353 kernel_specification,
354 _buffer_subscription: subscription,
355 telemetry,
356 };
357 }
358
359 fn on_buffer_event(
360 &mut self,
361 buffer: Model<MultiBuffer>,
362 event: &multi_buffer::Event,
363 cx: &mut ViewContext<Self>,
364 ) {
365 if let multi_buffer::Event::Edited { .. } = event {
366 let snapshot = buffer.read(cx).snapshot(cx);
367
368 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
369
370 self.blocks.retain(|_id, block| {
371 if block.invalidation_anchor.is_valid(&snapshot) {
372 true
373 } else {
374 blocks_to_remove.insert(block.block_id);
375 false
376 }
377 });
378
379 if !blocks_to_remove.is_empty() {
380 self.editor
381 .update(cx, |editor, cx| {
382 editor.remove_blocks(blocks_to_remove, None, cx);
383 })
384 .ok();
385 cx.notify();
386 }
387 }
388 }
389
390 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
391 match &mut self.kernel {
392 Kernel::RunningKernel(kernel) => {
393 kernel.request_tx.try_send(message).ok();
394 }
395 _ => {}
396 }
397
398 anyhow::Ok(())
399 }
400
401 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
402 let blocks_to_remove: HashSet<CustomBlockId> =
403 self.blocks.values().map(|block| block.block_id).collect();
404
405 self.editor
406 .update(cx, |editor, cx| {
407 editor.remove_blocks(blocks_to_remove, None, cx);
408 })
409 .ok();
410
411 self.blocks.clear();
412 }
413
414 pub fn execute(
415 &mut self,
416 code: String,
417 anchor_range: Range<Anchor>,
418 next_cell: Option<Anchor>,
419 cx: &mut ViewContext<Self>,
420 ) {
421 let Some(editor) = self.editor.upgrade() else {
422 return;
423 };
424
425 if code.is_empty() {
426 return;
427 }
428
429 let execute_request = ExecuteRequest {
430 code,
431 ..ExecuteRequest::default()
432 };
433
434 let message: JupyterMessage = execute_request.into();
435
436 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
437
438 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
439
440 self.blocks.retain(|_key, block| {
441 if anchor_range.overlaps(&block.code_range, &buffer) {
442 blocks_to_remove.insert(block.block_id);
443 false
444 } else {
445 true
446 }
447 });
448
449 self.editor
450 .update(cx, |editor, cx| {
451 editor.remove_blocks(blocks_to_remove, None, cx);
452 })
453 .ok();
454
455 let status = match &self.kernel {
456 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
457 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
458 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
459 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
460 Kernel::Shutdown => ExecutionStatus::Shutdown,
461 };
462
463 let parent_message_id = message.header.msg_id.clone();
464 let session_view = cx.view().downgrade();
465 let weak_editor = self.editor.clone();
466
467 let on_close: CloseBlockFn =
468 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
469 if let Some(session) = session_view.upgrade() {
470 session.update(cx, |session, cx| {
471 session.blocks.remove(&parent_message_id);
472 cx.notify();
473 });
474 }
475
476 if let Some(editor) = weak_editor.upgrade() {
477 editor.update(cx, |editor, cx| {
478 let mut block_ids = HashSet::default();
479 block_ids.insert(block_id);
480 editor.remove_blocks(block_ids, None, cx);
481 });
482 }
483 });
484
485 let Ok(editor_block) =
486 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
487 else {
488 return;
489 };
490
491 let new_cursor_pos = if let Some(next_cursor) = next_cell {
492 next_cursor
493 } else {
494 editor_block.invalidation_anchor
495 };
496
497 self.blocks
498 .insert(message.header.msg_id.clone(), editor_block);
499
500 match &self.kernel {
501 Kernel::RunningKernel(_) => {
502 self.send(message, cx).ok();
503 }
504 Kernel::StartingKernel(task) => {
505 // Queue up the execution as a task to run after the kernel starts
506 let task = task.clone();
507 let message = message.clone();
508
509 cx.spawn(|this, mut cx| async move {
510 task.await;
511 this.update(&mut cx, |session, cx| {
512 session.send(message, cx).ok();
513 })
514 .ok();
515 })
516 .detach();
517 }
518 _ => {}
519 }
520
521 // Now move the cursor to after the block
522 editor.update(cx, move |editor, cx| {
523 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
524 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
525 });
526 });
527 }
528
529 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
530 let parent_message_id = match message.parent_header.as_ref() {
531 Some(header) => &header.msg_id,
532 None => return,
533 };
534
535 match &message.content {
536 JupyterMessageContent::Status(status) => {
537 self.kernel.set_execution_state(&status.execution_state);
538
539 self.telemetry.report_repl_event(
540 self.kernel_specification.kernelspec.language.clone(),
541 KernelStatus::from(&self.kernel).to_string(),
542 cx.entity_id().to_string(),
543 );
544
545 cx.notify();
546 }
547 JupyterMessageContent::KernelInfoReply(reply) => {
548 self.kernel.set_kernel_info(&reply);
549 cx.notify();
550 }
551 _ => {}
552 }
553
554 if let Some(block) = self.blocks.get_mut(parent_message_id) {
555 block.handle_message(&message, cx);
556 return;
557 }
558 }
559
560 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
561 match &mut self.kernel {
562 Kernel::RunningKernel(_kernel) => {
563 self.send(InterruptRequest {}.into(), cx).ok();
564 }
565 Kernel::StartingKernel(_task) => {
566 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
567 }
568 _ => {}
569 }
570 }
571
572 pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
573 if let Kernel::Shutdown = kernel {
574 cx.emit(SessionEvent::Shutdown(self.editor.clone()));
575 }
576
577 let kernel_status = KernelStatus::from(&kernel).to_string();
578 let kernel_language = self.kernel_specification.kernelspec.language.clone();
579
580 self.telemetry.report_repl_event(
581 kernel_language,
582 kernel_status,
583 cx.entity_id().to_string(),
584 );
585
586 self.kernel = kernel;
587 }
588
589 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
590 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
591
592 match kernel {
593 Kernel::RunningKernel(mut kernel) => {
594 let mut request_tx = kernel.request_tx.clone();
595
596 cx.spawn(|this, mut cx| async move {
597 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
598 request_tx.try_send(message).ok();
599
600 // Give the kernel a bit of time to clean up
601 cx.background_executor().timer(Duration::from_secs(3)).await;
602
603 kernel.process.kill().ok();
604
605 this.update(&mut cx, |session, cx| {
606 session.clear_outputs(cx);
607 session.kernel(Kernel::Shutdown, cx);
608 cx.notify();
609 })
610 .ok();
611 })
612 .detach();
613 }
614 Kernel::StartingKernel(_kernel) => {
615 self.kernel = Kernel::Shutdown;
616 }
617 _ => {
618 self.kernel = Kernel::Shutdown;
619 }
620 }
621 cx.notify();
622 }
623}
624
625pub enum SessionEvent {
626 Shutdown(WeakView<Editor>),
627}
628
629impl EventEmitter<SessionEvent> for Session {}
630
631impl Render for Session {
632 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
633 let (status_text, interrupt_button) = match &self.kernel {
634 Kernel::RunningKernel(kernel) => (
635 kernel
636 .kernel_info
637 .as_ref()
638 .map(|info| info.language_info.name.clone()),
639 Some(
640 Button::new("interrupt", "Interrupt")
641 .style(ButtonStyle::Subtle)
642 .on_click(cx.listener(move |session, _, cx| {
643 session.interrupt(cx);
644 })),
645 ),
646 ),
647 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
648 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
649 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
650 Kernel::Shutdown => (Some("Shutdown".into()), None),
651 };
652
653 KernelListItem::new(self.kernel_specification.clone())
654 .status_color(match &self.kernel {
655 Kernel::RunningKernel(kernel) => match kernel.execution_state {
656 ExecutionState::Idle => Color::Success,
657 ExecutionState::Busy => Color::Modified,
658 },
659 Kernel::StartingKernel(_) => Color::Modified,
660 Kernel::ErroredLaunch(_) => Color::Error,
661 Kernel::ShuttingDown => Color::Modified,
662 Kernel::Shutdown => Color::Disabled,
663 })
664 .child(Label::new(self.kernel_specification.name.clone()))
665 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
666 .button(
667 Button::new("shutdown", "Shutdown")
668 .style(ButtonStyle::Subtle)
669 .disabled(self.kernel.is_shutting_down())
670 .on_click(cx.listener(move |session, _, cx| {
671 session.shutdown(cx);
672 })),
673 )
674 .buttons(interrupt_button)
675 }
676}