1use crate::components::KernelListItem;
2use crate::{
3 kernels::{Kernel, KernelSpecification, RunningKernel},
4 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
5};
6use collections::{HashMap, HashSet};
7use editor::{
8 display_map::{
9 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
10 RenderBlock,
11 },
12 scroll::Autoscroll,
13 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
14};
15use futures::{FutureExt as _, StreamExt as _};
16use gpui::{
17 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
18 WeakView,
19};
20use language::Point;
21use project::Fs;
22use runtimelib::{
23 ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
24 ShutdownRequest,
25};
26use settings::Settings as _;
27use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
28use theme::{ActiveTheme, ThemeSettings};
29use ui::{prelude::*, IconButtonShape, Tooltip};
30
31pub struct Session {
32 editor: WeakView<Editor>,
33 pub kernel: Kernel,
34 blocks: HashMap<String, EditorBlock>,
35 messaging_task: Task<()>,
36 pub kernel_specification: KernelSpecification,
37 _buffer_subscription: Subscription,
38}
39
40struct EditorBlock {
41 editor: WeakView<Editor>,
42 code_range: Range<Anchor>,
43 invalidation_anchor: Anchor,
44 block_id: CustomBlockId,
45 execution_view: View<ExecutionView>,
46 on_close: CloseBlockFn,
47}
48
49type CloseBlockFn =
50 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
51
52impl EditorBlock {
53 fn new(
54 editor: WeakView<Editor>,
55 code_range: Range<Anchor>,
56 status: ExecutionStatus,
57 on_close: CloseBlockFn,
58 cx: &mut ViewContext<Session>,
59 ) -> anyhow::Result<Self> {
60 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
61
62 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
63 let buffer = editor.buffer().clone();
64 let buffer_snapshot = buffer.read(cx).snapshot(cx);
65 let end_point = code_range.end.to_point(&buffer_snapshot);
66 let next_row_start = end_point + Point::new(1, 0);
67 if next_row_start > buffer_snapshot.max_point() {
68 buffer.update(cx, |buffer, cx| {
69 buffer.edit(
70 [(
71 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
72 "\n",
73 )],
74 None,
75 cx,
76 )
77 });
78 }
79
80 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
81 let block = BlockProperties {
82 position: code_range.end,
83 height: execution_view.num_lines(cx).saturating_add(1),
84 style: BlockStyle::Sticky,
85 render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
86 disposition: BlockDisposition::Below,
87 };
88
89 let block_id = editor.insert_blocks([block], None, cx)[0];
90 (block_id, invalidation_anchor)
91 })?;
92
93 anyhow::Ok(Self {
94 editor,
95 code_range,
96 invalidation_anchor,
97 block_id,
98 execution_view,
99 on_close,
100 })
101 }
102
103 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
104 self.execution_view.update(cx, |execution_view, cx| {
105 execution_view.push_message(&message.content, cx);
106 });
107
108 self.editor
109 .update(cx, |editor, cx| {
110 let mut replacements = HashMap::default();
111
112 replacements.insert(
113 self.block_id,
114 (
115 Some(self.execution_view.num_lines(cx).saturating_add(1)),
116 Self::create_output_area_renderer(
117 self.execution_view.clone(),
118 self.on_close.clone(),
119 ),
120 ),
121 );
122 editor.replace_blocks(replacements, None, cx);
123 })
124 .ok();
125 }
126
127 fn create_output_area_renderer(
128 execution_view: View<ExecutionView>,
129 on_close: CloseBlockFn,
130 ) -> RenderBlock {
131 let render = move |cx: &mut BlockContext| {
132 let execution_view = execution_view.clone();
133 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
134 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
135
136 let gutter = cx.gutter_dimensions;
137 let close_button_size = IconSize::XSmall;
138
139 let block_id = cx.block_id;
140 let on_close = on_close.clone();
141
142 let rem_size = cx.rem_size();
143 let line_height = cx.text_style().line_height_in_pixels(rem_size);
144
145 let (close_button_width, close_button_padding) =
146 close_button_size.square_components(cx);
147
148 div()
149 .min_h(line_height)
150 .flex()
151 .flex_row()
152 .items_start()
153 .w_full()
154 .bg(cx.theme().colors().background)
155 .border_y_1()
156 .border_color(cx.theme().colors().border)
157 .child(
158 v_flex().min_h(cx.line_height()).justify_center().child(
159 h_flex()
160 .w(gutter.full_width())
161 .justify_end()
162 .pt(line_height / 2.)
163 .child(
164 h_flex()
165 .pr(gutter.width / 2. - close_button_width
166 + close_button_padding / 2.)
167 .child(
168 IconButton::new(
169 ("close_output_area", EntityId::from(cx.block_id)),
170 IconName::Close,
171 )
172 .shape(IconButtonShape::Square)
173 .icon_size(close_button_size)
174 .icon_color(Color::Muted)
175 .tooltip(|cx| Tooltip::text("Close output area", cx))
176 .on_click(
177 move |_, cx| {
178 if let BlockId::Custom(block_id) = block_id {
179 (on_close)(block_id, cx)
180 }
181 },
182 ),
183 ),
184 ),
185 ),
186 )
187 .child(
188 div()
189 .flex_1()
190 .size_full()
191 .my_2()
192 .mr(gutter.width)
193 .text_size(text_font_size)
194 .font_family(text_font)
195 .child(execution_view),
196 )
197 .into_any_element()
198 };
199
200 Box::new(render)
201 }
202}
203
204impl Session {
205 pub fn new(
206 editor: WeakView<Editor>,
207 fs: Arc<dyn Fs>,
208 kernel_specification: KernelSpecification,
209 cx: &mut ViewContext<Self>,
210 ) -> Self {
211 let entity_id = editor.entity_id();
212 let working_directory = editor
213 .upgrade()
214 .and_then(|editor| editor.read(cx).working_directory(cx))
215 .unwrap_or_else(temp_dir);
216 let kernel = RunningKernel::new(
217 kernel_specification.clone(),
218 entity_id,
219 working_directory,
220 fs.clone(),
221 cx,
222 );
223
224 let pending_kernel = cx
225 .spawn(|this, mut cx| async move {
226 let kernel = kernel.await;
227
228 match kernel {
229 Ok((mut kernel, mut messages_rx)) => {
230 this.update(&mut cx, |this, cx| {
231 // At this point we can create a new kind of kernel that has the process and our long running background tasks
232
233 let status = kernel.process.status();
234 this.kernel = Kernel::RunningKernel(kernel);
235
236 cx.spawn(|session, mut cx| async move {
237 let error_message = match status.await {
238 Ok(status) => {
239 if status.success() {
240 log::info!("kernel process exited successfully");
241 return;
242 }
243
244 format!("kernel process exited with status: {:?}", status)
245 }
246 Err(err) => {
247 format!("kernel process exited with error: {:?}", err)
248 }
249 };
250
251 log::error!("{}", error_message);
252
253 session
254 .update(&mut cx, |session, cx| {
255 session.kernel =
256 Kernel::ErroredLaunch(error_message.clone());
257
258 session.blocks.values().for_each(|block| {
259 block.execution_view.update(
260 cx,
261 |execution_view, cx| {
262 match execution_view.status {
263 ExecutionStatus::Finished => {
264 // Do nothing when the output was good
265 }
266 _ => {
267 // All other cases, set the status to errored
268 execution_view.status =
269 ExecutionStatus::KernelErrored(
270 error_message.clone(),
271 )
272 }
273 }
274 cx.notify();
275 },
276 );
277 });
278
279 cx.notify();
280 })
281 .ok();
282 })
283 .detach();
284
285 this.messaging_task = cx.spawn(|session, mut cx| async move {
286 while let Some(message) = messages_rx.next().await {
287 session
288 .update(&mut cx, |session, cx| {
289 session.route(&message, cx);
290 })
291 .ok();
292 }
293 });
294 })
295 .ok();
296 }
297 Err(err) => {
298 this.update(&mut cx, |this, _cx| {
299 this.kernel = Kernel::ErroredLaunch(err.to_string());
300 })
301 .ok();
302 }
303 }
304 })
305 .shared();
306
307 let subscription = match editor.upgrade() {
308 Some(editor) => {
309 let buffer = editor.read(cx).buffer().clone();
310 cx.subscribe(&buffer, Self::on_buffer_event)
311 }
312 None => Subscription::new(|| {}),
313 };
314
315 return Self {
316 editor,
317 kernel: Kernel::StartingKernel(pending_kernel),
318 messaging_task: Task::ready(()),
319 blocks: HashMap::default(),
320 kernel_specification,
321 _buffer_subscription: subscription,
322 };
323 }
324
325 fn on_buffer_event(
326 &mut self,
327 buffer: Model<MultiBuffer>,
328 event: &multi_buffer::Event,
329 cx: &mut ViewContext<Self>,
330 ) {
331 if let multi_buffer::Event::Edited { .. } = event {
332 let snapshot = buffer.read(cx).snapshot(cx);
333
334 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
335
336 self.blocks.retain(|_id, block| {
337 if block.invalidation_anchor.is_valid(&snapshot) {
338 true
339 } else {
340 blocks_to_remove.insert(block.block_id);
341 false
342 }
343 });
344
345 if !blocks_to_remove.is_empty() {
346 self.editor
347 .update(cx, |editor, cx| {
348 editor.remove_blocks(blocks_to_remove, None, cx);
349 })
350 .ok();
351 cx.notify();
352 }
353 }
354 }
355
356 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
357 match &mut self.kernel {
358 Kernel::RunningKernel(kernel) => {
359 kernel.request_tx.try_send(message).ok();
360 }
361 _ => {}
362 }
363
364 anyhow::Ok(())
365 }
366
367 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
368 let blocks_to_remove: HashSet<CustomBlockId> =
369 self.blocks.values().map(|block| block.block_id).collect();
370
371 self.editor
372 .update(cx, |editor, cx| {
373 editor.remove_blocks(blocks_to_remove, None, cx);
374 })
375 .ok();
376
377 self.blocks.clear();
378 }
379
380 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
381 let Some(editor) = self.editor.upgrade() else {
382 return;
383 };
384
385 if code.is_empty() {
386 return;
387 }
388
389 let execute_request = ExecuteRequest {
390 code: code.to_string(),
391 ..ExecuteRequest::default()
392 };
393
394 let message: JupyterMessage = execute_request.into();
395
396 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
397
398 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
399
400 self.blocks.retain(|_key, block| {
401 if anchor_range.overlaps(&block.code_range, &buffer) {
402 blocks_to_remove.insert(block.block_id);
403 false
404 } else {
405 true
406 }
407 });
408
409 self.editor
410 .update(cx, |editor, cx| {
411 editor.remove_blocks(blocks_to_remove, None, cx);
412 })
413 .ok();
414
415 let status = match &self.kernel {
416 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
417 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
418 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
419 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
420 Kernel::Shutdown => ExecutionStatus::Shutdown,
421 };
422
423 let parent_message_id = message.header.msg_id.clone();
424 let session_view = cx.view().downgrade();
425 let weak_editor = self.editor.clone();
426
427 let on_close: CloseBlockFn =
428 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
429 if let Some(session) = session_view.upgrade() {
430 session.update(cx, |session, cx| {
431 session.blocks.remove(&parent_message_id);
432 cx.notify();
433 });
434 }
435
436 if let Some(editor) = weak_editor.upgrade() {
437 editor.update(cx, |editor, cx| {
438 let mut block_ids = HashSet::default();
439 block_ids.insert(block_id);
440 editor.remove_blocks(block_ids, None, cx);
441 });
442 }
443 });
444
445 let Ok(editor_block) =
446 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
447 else {
448 return;
449 };
450
451 let new_cursor_pos = editor_block.invalidation_anchor;
452
453 self.blocks
454 .insert(message.header.msg_id.clone(), editor_block);
455
456 match &self.kernel {
457 Kernel::RunningKernel(_) => {
458 self.send(message, cx).ok();
459 }
460 Kernel::StartingKernel(task) => {
461 // Queue up the execution as a task to run after the kernel starts
462 let task = task.clone();
463 let message = message.clone();
464
465 cx.spawn(|this, mut cx| async move {
466 task.await;
467 this.update(&mut cx, |this, cx| {
468 this.send(message, cx).ok();
469 })
470 .ok();
471 })
472 .detach();
473 }
474 _ => {}
475 }
476
477 // Now move the cursor to after the block
478 editor.update(cx, move |editor, cx| {
479 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
480 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
481 });
482 });
483 }
484
485 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
486 let parent_message_id = match message.parent_header.as_ref() {
487 Some(header) => &header.msg_id,
488 None => return,
489 };
490
491 match &message.content {
492 JupyterMessageContent::Status(status) => {
493 self.kernel.set_execution_state(&status.execution_state);
494 cx.notify();
495 }
496 JupyterMessageContent::KernelInfoReply(reply) => {
497 self.kernel.set_kernel_info(&reply);
498 cx.notify();
499 }
500 _ => {}
501 }
502
503 if let Some(block) = self.blocks.get_mut(parent_message_id) {
504 block.handle_message(&message, cx);
505 return;
506 }
507 }
508
509 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
510 match &mut self.kernel {
511 Kernel::RunningKernel(_kernel) => {
512 self.send(InterruptRequest {}.into(), cx).ok();
513 }
514 Kernel::StartingKernel(_task) => {
515 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
516 }
517 _ => {}
518 }
519 }
520
521 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
522 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
523
524 match kernel {
525 Kernel::RunningKernel(mut kernel) => {
526 let mut request_tx = kernel.request_tx.clone();
527
528 cx.spawn(|this, mut cx| async move {
529 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
530 request_tx.try_send(message).ok();
531
532 // Give the kernel a bit of time to clean up
533 cx.background_executor().timer(Duration::from_secs(3)).await;
534
535 kernel.process.kill().ok();
536
537 this.update(&mut cx, |this, cx| {
538 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
539 this.clear_outputs(cx);
540 this.kernel = Kernel::Shutdown;
541 cx.notify();
542 })
543 .ok();
544 })
545 .detach();
546 }
547 Kernel::StartingKernel(_kernel) => {
548 self.kernel = Kernel::Shutdown;
549 }
550 _ => {
551 self.kernel = Kernel::Shutdown;
552 }
553 }
554 cx.notify();
555 }
556}
557
558pub enum SessionEvent {
559 Shutdown(WeakView<Editor>),
560}
561
562impl EventEmitter<SessionEvent> for Session {}
563
564impl Render for Session {
565 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
566 let (status_text, interrupt_button) = match &self.kernel {
567 Kernel::RunningKernel(kernel) => (
568 kernel
569 .kernel_info
570 .as_ref()
571 .map(|info| info.language_info.name.clone()),
572 Some(
573 Button::new("interrupt", "Interrupt")
574 .style(ButtonStyle::Subtle)
575 .on_click(cx.listener(move |session, _, cx| {
576 session.interrupt(cx);
577 })),
578 ),
579 ),
580 Kernel::StartingKernel(_) => (Some("Starting".into()), None),
581 Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
582 Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
583 Kernel::Shutdown => (Some("Shutdown".into()), None),
584 };
585
586 KernelListItem::new(self.kernel_specification.clone())
587 .status_color(match &self.kernel {
588 Kernel::RunningKernel(kernel) => match kernel.execution_state {
589 ExecutionState::Idle => Color::Success,
590 ExecutionState::Busy => Color::Modified,
591 },
592 Kernel::StartingKernel(_) => Color::Modified,
593 Kernel::ErroredLaunch(_) => Color::Error,
594 Kernel::ShuttingDown => Color::Modified,
595 Kernel::Shutdown => Color::Disabled,
596 })
597 .child(Label::new(self.kernel_specification.name.clone()))
598 .children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
599 .button(
600 Button::new("shutdown", "Shutdown")
601 .style(ButtonStyle::Subtle)
602 .disabled(self.kernel.is_shutting_down())
603 .on_click(cx.listener(move |session, _, cx| {
604 session.shutdown(cx);
605 })),
606 )
607 .buttons(interrupt_button)
608 }
609}