1use crate::components::KernelListItem;
2use crate::{
3 kernels::{Kernel, KernelSpecification, RunningKernel},
4 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
5};
6use collections::{HashMap, HashSet};
7use editor::{
8 display_map::{
9 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
10 RenderBlock,
11 },
12 scroll::Autoscroll,
13 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
14};
15use futures::{FutureExt as _, StreamExt as _};
16use gpui::{
17 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
18 WeakView,
19};
20use language::Point;
21use project::Fs;
22use runtimelib::{
23 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
24 ShutdownRequest,
25};
26use settings::Settings as _;
27use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
28use theme::{ActiveTheme, ThemeSettings};
29use ui::{prelude::*, IconButtonShape, Tooltip};
30
31pub struct Session {
32 editor: WeakView<Editor>,
33 pub kernel: Kernel,
34 blocks: HashMap<String, EditorBlock>,
35 messaging_task: Task<()>,
36 pub kernel_specification: KernelSpecification,
37 _buffer_subscription: Subscription,
38}
39
40struct EditorBlock {
41 editor: WeakView<Editor>,
42 code_range: Range<Anchor>,
43 invalidation_anchor: Anchor,
44 block_id: CustomBlockId,
45 execution_view: View<ExecutionView>,
46 on_close: CloseBlockFn,
47}
48
49type CloseBlockFn =
50 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
51
52impl EditorBlock {
53 fn new(
54 editor: WeakView<Editor>,
55 code_range: Range<Anchor>,
56 status: ExecutionStatus,
57 on_close: CloseBlockFn,
58 cx: &mut ViewContext<Session>,
59 ) -> anyhow::Result<Self> {
60 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
61
62 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
63 let buffer = editor.buffer().clone();
64 let buffer_snapshot = buffer.read(cx).snapshot(cx);
65 let end_point = code_range.end.to_point(&buffer_snapshot);
66 let next_row_start = end_point + Point::new(1, 0);
67 if next_row_start > buffer_snapshot.max_point() {
68 buffer.update(cx, |buffer, cx| {
69 buffer.edit(
70 [(
71 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
72 "\n",
73 )],
74 None,
75 cx,
76 )
77 });
78 }
79
80 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
81 let block = BlockProperties {
82 position: code_range.end,
83 height: execution_view.num_lines(cx).saturating_add(1),
84 style: BlockStyle::Sticky,
85 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
86 disposition: BlockDisposition::Below,
87 };
88
89 let block_id = editor.insert_blocks([block], None, cx)[0];
90 (block_id, invalidation_anchor)
91 })?;
92
93 anyhow::Ok(Self {
94 editor,
95 code_range,
96 invalidation_anchor,
97 block_id,
98 execution_view,
99 on_close,
100 })
101 }
102
103 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
104 self.execution_view.update(cx, |execution_view, cx| {
105 execution_view.push_message(&message.content, cx);
106 });
107
108 self.editor
109 .update(cx, |editor, cx| {
110 let mut replacements = HashMap::default();
111
112 replacements.insert(
113 self.block_id,
114 (
115 Some(self.execution_view.num_lines(cx).saturating_add(1)),
116 Self::create_output_area_renderer(
117 self.execution_view.clone(),
118 self.on_close.clone(),
119 ),
120 ),
121 );
122 editor.replace_blocks(replacements, None, cx);
123 })
124 .ok();
125 }
126
127 fn create_output_area_renderer(
128 execution_view: View<ExecutionView>,
129 on_close: CloseBlockFn,
130 ) -> RenderBlock {
131 let render = move |cx: &mut BlockContext| {
132 let execution_view = execution_view.clone();
133 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
134 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
135
136 let gutter = cx.gutter_dimensions;
137 let close_button_size = IconSize::XSmall;
138
139 let block_id = cx.block_id;
140 let on_close = on_close.clone();
141
142 let rem_size = cx.rem_size();
143 let line_height = cx.text_style().line_height_in_pixels(rem_size);
144
145 let (close_button_width, close_button_padding) =
146 close_button_size.square_components(cx);
147
148 div()
149 .min_h(line_height)
150 .flex()
151 .flex_row()
152 .items_start()
153 .w_full()
154 .bg(cx.theme().colors().background)
155 .border_y_1()
156 .border_color(cx.theme().colors().border)
157 .child(
158 v_flex().min_h(cx.line_height()).justify_center().child(
159 h_flex()
160 .w(gutter.full_width())
161 .justify_end()
162 .pt(line_height / 2.)
163 .child(
164 h_flex()
165 .pr(gutter.width / 2. - close_button_width
166 + close_button_padding / 2.)
167 .child(
168 IconButton::new(
169 ("close_output_area", EntityId::from(cx.block_id)),
170 IconName::Close,
171 )
172 .shape(IconButtonShape::Square)
173 .icon_size(close_button_size)
174 .icon_color(Color::Muted)
175 .tooltip(|cx| Tooltip::text("Close output area", cx))
176 .on_click(
177 move |_, cx| {
178 if let BlockId::Custom(block_id) = block_id {
179 (on_close)(block_id, cx)
180 }
181 },
182 ),
183 ),
184 ),
185 ),
186 )
187 .child(
188 div()
189 .flex_1()
190 .size_full()
191 .my_2()
192 .mr(gutter.width)
193 .text_size(text_font_size)
194 .font_family(text_font)
195 .child(execution_view),
196 )
197 .into_any_element()
198 };
199
200 Box::new(render)
201 }
202}
203
204impl Session {
205 pub fn new(
206 editor: WeakView<Editor>,
207 fs: Arc<dyn Fs>,
208 kernel_specification: KernelSpecification,
209 cx: &mut ViewContext<Self>,
210 ) -> Self {
211 let entity_id = editor.entity_id();
212 let working_directory = editor
213 .upgrade()
214 .and_then(|editor| editor.read(cx).working_directory(cx))
215 .unwrap_or_else(temp_dir);
216 let kernel = RunningKernel::new(
217 kernel_specification.clone(),
218 entity_id,
219 working_directory,
220 fs.clone(),
221 cx,
222 );
223
224 let pending_kernel = cx
225 .spawn(|this, mut cx| async move {
226 let kernel = kernel.await;
227
228 match kernel {
229 Ok((mut kernel, mut messages_rx)) => {
230 this.update(&mut cx, |this, cx| {
231 // At this point we can create a new kind of kernel that has the process and our long running background tasks
232
233 let status = kernel.process.status();
234 this.kernel = Kernel::RunningKernel(kernel);
235
236 cx.spawn(|session, mut cx| async move {
237 let error_message = match status.await {
238 Ok(status) => {
239 if status.success() {
240 log::info!("kernel process exited successfully");
241 return;
242 }
243
244 format!("kernel process exited with status: {:?}", status)
245 }
246 Err(err) => {
247 format!("kernel process exited with error: {:?}", err)
248 }
249 };
250
251 log::error!("{}", error_message);
252
253 session
254 .update(&mut cx, |session, cx| {
255 session.kernel =
256 Kernel::ErroredLaunch(error_message.clone());
257
258 session.blocks.values().for_each(|block| {
259 block.execution_view.update(
260 cx,
261 |execution_view, cx| {
262 match execution_view.status {
263 ExecutionStatus::Finished => {
264 // Do nothing when the output was good
265 }
266 _ => {
267 // All other cases, set the status to errored
268 execution_view.status =
269 ExecutionStatus::KernelErrored(
270 error_message.clone(),
271 )
272 }
273 }
274 cx.notify();
275 },
276 );
277 });
278
279 cx.notify();
280 })
281 .ok();
282 })
283 .detach();
284
285 this.messaging_task = cx.spawn(|session, mut cx| async move {
286 while let Some(message) = messages_rx.next().await {
287 session
288 .update(&mut cx, |session, cx| {
289 session.route(&message, cx);
290 })
291 .ok();
292 }
293 });
294 })
295 .ok();
296 }
297 Err(err) => {
298 this.update(&mut cx, |this, _cx| {
299 this.kernel = Kernel::ErroredLaunch(err.to_string());
300 })
301 .ok();
302 }
303 }
304 })
305 .shared();
306
307 let subscription = match editor.upgrade() {
308 Some(editor) => {
309 let buffer = editor.read(cx).buffer().clone();
310 cx.subscribe(&buffer, Self::on_buffer_event)
311 }
312 None => Subscription::new(|| {}),
313 };
314
315 return Self {
316 editor,
317 kernel: Kernel::StartingKernel(pending_kernel),
318 messaging_task: Task::ready(()),
319 blocks: HashMap::default(),
320 kernel_specification,
321 _buffer_subscription: subscription,
322 };
323 }
324
325 fn on_buffer_event(
326 &mut self,
327 buffer: Model<MultiBuffer>,
328 event: &multi_buffer::Event,
329 cx: &mut ViewContext<Self>,
330 ) {
331 if let multi_buffer::Event::Edited { .. } = event {
332 let snapshot = buffer.read(cx).snapshot(cx);
333
334 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
335
336 self.blocks.retain(|_id, block| {
337 if block.invalidation_anchor.is_valid(&snapshot) {
338 true
339 } else {
340 blocks_to_remove.insert(block.block_id);
341 false
342 }
343 });
344
345 if !blocks_to_remove.is_empty() {
346 self.editor
347 .update(cx, |editor, cx| {
348 editor.remove_blocks(blocks_to_remove, None, cx);
349 })
350 .ok();
351 cx.notify();
352 }
353 }
354 }
355
356 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
357 match &mut self.kernel {
358 Kernel::RunningKernel(kernel) => {
359 kernel.request_tx.try_send(message).ok();
360 }
361 _ => {}
362 }
363
364 anyhow::Ok(())
365 }
366
367 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
368 let blocks_to_remove: HashSet<CustomBlockId> =
369 self.blocks.values().map(|block| block.block_id).collect();
370
371 self.editor
372 .update(cx, |editor, cx| {
373 editor.remove_blocks(blocks_to_remove, None, cx);
374 })
375 .ok();
376
377 self.blocks.clear();
378 }
379
380 pub fn execute(
381 &mut self,
382 code: String,
383 anchor_range: Range<Anchor>,
384 next_cell: Option<Anchor>,
385 cx: &mut ViewContext<Self>,
386 ) {
387 let Some(editor) = self.editor.upgrade() else {
388 return;
389 };
390
391 if code.is_empty() {
392 return;
393 }
394
395 let execute_request = ExecuteRequest {
396 code,
397 ..ExecuteRequest::default()
398 };
399
400 let message: JupyterMessage = execute_request.into();
401
402 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
403
404 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
405
406 self.blocks.retain(|_key, block| {
407 if anchor_range.overlaps(&block.code_range, &buffer) {
408 blocks_to_remove.insert(block.block_id);
409 false
410 } else {
411 true
412 }
413 });
414
415 self.editor
416 .update(cx, |editor, cx| {
417 editor.remove_blocks(blocks_to_remove, None, cx);
418 })
419 .ok();
420
421 let status = match &self.kernel {
422 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
423 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
424 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
425 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
426 Kernel::Shutdown => ExecutionStatus::Shutdown,
427 };
428
429 let parent_message_id = message.header.msg_id.clone();
430 let session_view = cx.view().downgrade();
431 let weak_editor = self.editor.clone();
432
433 let on_close: CloseBlockFn =
434 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
435 if let Some(session) = session_view.upgrade() {
436 session.update(cx, |session, cx| {
437 session.blocks.remove(&parent_message_id);
438 cx.notify();
439 });
440 }
441
442 if let Some(editor) = weak_editor.upgrade() {
443 editor.update(cx, |editor, cx| {
444 let mut block_ids = HashSet::default();
445 block_ids.insert(block_id);
446 editor.remove_blocks(block_ids, None, cx);
447 });
448 }
449 });
450
451 let Ok(editor_block) =
452 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
453 else {
454 return;
455 };
456
457 let new_cursor_pos = if let Some(next_cursor) = next_cell {
458 next_cursor
459 } else {
460 editor_block.invalidation_anchor
461 };
462
463 self.blocks
464 .insert(message.header.msg_id.clone(), editor_block);
465
466 match &self.kernel {
467 Kernel::RunningKernel(_) => {
468 self.send(message, cx).ok();
469 }
470 Kernel::StartingKernel(task) => {
471 // Queue up the execution as a task to run after the kernel starts
472 let task = task.clone();
473 let message = message.clone();
474
475 cx.spawn(|this, mut cx| async move {
476 task.await;
477 this.update(&mut cx, |this, cx| {
478 this.send(message, cx).ok();
479 })
480 .ok();
481 })
482 .detach();
483 }
484 _ => {}
485 }
486
487 // Now move the cursor to after the block
488 editor.update(cx, move |editor, cx| {
489 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
490 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
491 });
492 });
493 }
494
495 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
496 let parent_message_id = match message.parent_header.as_ref() {
497 Some(header) => &header.msg_id,
498 None => return,
499 };
500
501 match &message.content {
502 JupyterMessageContent::Status(status) => {
503 self.kernel.set_execution_state(&status.execution_state);
504 cx.notify();
505 }
506 JupyterMessageContent::KernelInfoReply(reply) => {
507 self.kernel.set_kernel_info(&reply);
508 cx.notify();
509 }
510 _ => {}
511 }
512
513 if let Some(block) = self.blocks.get_mut(parent_message_id) {
514 block.handle_message(&message, cx);
515 return;
516 }
517 }
518
519 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
520 match &mut self.kernel {
521 Kernel::RunningKernel(_kernel) => {
522 self.send(InterruptRequest {}.into(), cx).ok();
523 }
524 Kernel::StartingKernel(_task) => {
525 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
526 }
527 _ => {}
528 }
529 }
530
531 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
532 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
533
534 match kernel {
535 Kernel::RunningKernel(mut kernel) => {
536 let mut request_tx = kernel.request_tx.clone();
537
538 cx.spawn(|this, mut cx| async move {
539 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
540 request_tx.try_send(message).ok();
541
542 // Give the kernel a bit of time to clean up
543 cx.background_executor().timer(Duration::from_secs(3)).await;
544
545 kernel.process.kill().ok();
546
547 this.update(&mut cx, |this, cx| {
548 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
549 this.clear_outputs(cx);
550 this.kernel = Kernel::Shutdown;
551 cx.notify();
552 })
553 .ok();
554 })
555 .detach();
556 }
557 Kernel::StartingKernel(_kernel) => {
558 self.kernel = Kernel::Shutdown;
559 }
560 _ => {
561 self.kernel = Kernel::Shutdown;
562 }
563 }
564 cx.notify();
565 }
566}
567
568pub enum SessionEvent {
569 Shutdown(WeakView<Editor>),
570}
571
572impl EventEmitter<SessionEvent> for Session {}
573
574impl Render for Session {
575 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
576 let (status_text, interrupt_button) = match &self.kernel {
577 Kernel::RunningKernel(kernel) => (
578 kernel
579 .kernel_info
580 .as_ref()
581 .map(|info| info.language_info.name.clone()),
582 Some(
583 Button::new("interrupt", "Interrupt")
584 .style(ButtonStyle::Subtle)
585 .on_click(cx.listener(move |session, _, cx| {
586 session.interrupt(cx);
587 })),
588 ),
589 ),
590 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
591 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
592 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
593 Kernel::Shutdown => (Some("Shutdown".into()), None),
594 };
595
596 KernelListItem::new(self.kernel_specification.clone())
597 .status_color(match &self.kernel {
598 Kernel::RunningKernel(kernel) => match kernel.execution_state {
599 ExecutionState::Idle => Color::Success,
600 ExecutionState::Busy => Color::Modified,
601 },
602 Kernel::StartingKernel(_) => Color::Modified,
603 Kernel::ErroredLaunch(_) => Color::Error,
604 Kernel::ShuttingDown => Color::Modified,
605 Kernel::Shutdown => Color::Disabled,
606 })
607 .child(Label::new(self.kernel_specification.name.clone()))
608 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
609 .button(
610 Button::new("shutdown", "Shutdown")
611 .style(ButtonStyle::Subtle)
612 .disabled(self.kernel.is_shutting_down())
613 .on_click(cx.listener(move |session, _, cx| {
614 session.shutdown(cx);
615 })),
616 )
617 .buttons(interrupt_button)
618 }
619}