1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
9 RenderBlock,
10 },
11 scroll::Autoscroll,
12 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
13};
14use futures::{FutureExt as _, StreamExt as _};
15use gpui::{
16 div, prelude::*, EntityId, EventEmitter, Model, Render, Subscription, Task, View, ViewContext,
17 WeakView,
18};
19use language::Point;
20use project::Fs;
21use runtimelib::{
22 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
23};
24use settings::Settings as _;
25use std::{env::temp_dir, ops::Range, path::PathBuf, sync::Arc, time::Duration};
26use theme::{ActiveTheme, ThemeSettings};
27use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, IconButtonShape, Label, Tooltip};
28
29pub struct Session {
30 pub editor: WeakView<Editor>,
31 pub kernel: Kernel,
32 blocks: HashMap<String, EditorBlock>,
33 pub messaging_task: Task<()>,
34 pub kernel_specification: KernelSpecification,
35 _buffer_subscription: Subscription,
36}
37
38struct EditorBlock {
39 editor: WeakView<Editor>,
40 code_range: Range<Anchor>,
41 invalidation_anchor: Anchor,
42 block_id: CustomBlockId,
43 execution_view: View<ExecutionView>,
44 on_close: CloseBlockFn,
45}
46
47type CloseBlockFn =
48 Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
49
50impl EditorBlock {
51 fn new(
52 editor: WeakView<Editor>,
53 code_range: Range<Anchor>,
54 status: ExecutionStatus,
55 on_close: CloseBlockFn,
56 cx: &mut ViewContext<Session>,
57 ) -> anyhow::Result<Self> {
58 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
59
60 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
61 let buffer = editor.buffer().clone();
62 let buffer_snapshot = buffer.read(cx).snapshot(cx);
63 let end_point = code_range.end.to_point(&buffer_snapshot);
64 let next_row_start = end_point + Point::new(1, 0);
65 if next_row_start > buffer_snapshot.max_point() {
66 buffer.update(cx, |buffer, cx| {
67 buffer.edit(
68 [(
69 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
70 "\n",
71 )],
72 None,
73 cx,
74 )
75 });
76 }
77
78 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
79 let block = BlockProperties {
80 position: code_range.end,
81 height: execution_view.num_lines(cx).saturating_add(1),
82 style: BlockStyle::Sticky,
83 render: Self::create_output_area_render(execution_view.clone(), on_close.clone()),
84 disposition: BlockDisposition::Below,
85 };
86
87 let block_id = editor.insert_blocks([block], None, cx)[0];
88 (block_id, invalidation_anchor)
89 })?;
90
91 anyhow::Ok(Self {
92 editor,
93 code_range,
94 invalidation_anchor,
95 block_id,
96 execution_view,
97 on_close,
98 })
99 }
100
101 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
102 self.execution_view.update(cx, |execution_view, cx| {
103 execution_view.push_message(&message.content, cx);
104 });
105
106 self.editor
107 .update(cx, |editor, cx| {
108 let mut replacements = HashMap::default();
109
110 replacements.insert(
111 self.block_id,
112 (
113 Some(self.execution_view.num_lines(cx).saturating_add(1)),
114 Self::create_output_area_render(
115 self.execution_view.clone(),
116 self.on_close.clone(),
117 ),
118 ),
119 );
120 editor.replace_blocks(replacements, None, cx);
121 })
122 .ok();
123 }
124
125 fn create_output_area_render(
126 execution_view: View<ExecutionView>,
127 on_close: CloseBlockFn,
128 ) -> RenderBlock {
129 let render = move |cx: &mut BlockContext| {
130 let execution_view = execution_view.clone();
131 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
132 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
133
134 let gutter = cx.gutter_dimensions;
135 let close_button_size = IconSize::XSmall;
136
137 let block_id = cx.block_id;
138 let on_close = on_close.clone();
139
140 let rem_size = cx.rem_size();
141 let line_height = cx.text_style().line_height_in_pixels(rem_size);
142
143 let (close_button_width, close_button_padding) =
144 close_button_size.square_components(cx);
145
146 div()
147 .min_h(line_height)
148 .flex()
149 .flex_row()
150 .items_start()
151 .w_full()
152 .bg(cx.theme().colors().background)
153 .border_y_1()
154 .border_color(cx.theme().colors().border)
155 .child(
156 v_flex().min_h(cx.line_height()).justify_center().child(
157 h_flex()
158 .w(gutter.full_width())
159 .justify_end()
160 .pt(line_height / 2.)
161 .child(
162 h_flex()
163 .pr(gutter.width / 2. - close_button_width
164 + close_button_padding / 2.)
165 .child(
166 IconButton::new(
167 ("close_output_area", EntityId::from(cx.block_id)),
168 IconName::Close,
169 )
170 .shape(IconButtonShape::Square)
171 .icon_size(close_button_size)
172 .icon_color(Color::Muted)
173 .tooltip(|cx| Tooltip::text("Close output area", cx))
174 .on_click(
175 move |_, cx| {
176 if let BlockId::Custom(block_id) = block_id {
177 (on_close)(block_id, cx)
178 }
179 },
180 ),
181 ),
182 ),
183 ),
184 )
185 .child(
186 div()
187 .flex_1()
188 .size_full()
189 .my_2()
190 .mr(gutter.width)
191 .text_size(text_font_size)
192 .font_family(text_font)
193 .child(execution_view),
194 )
195 .into_any_element()
196 };
197
198 Box::new(render)
199 }
200}
201
202impl Session {
203 pub fn working_directory(editor: WeakView<Editor>, cx: &WindowContext) -> PathBuf {
204 if let Some(working_directory) = editor
205 .upgrade()
206 .and_then(|editor| editor.read(cx).working_directory(cx))
207 {
208 working_directory
209 } else {
210 temp_dir()
211 }
212 }
213
214 pub fn new(
215 editor: WeakView<Editor>,
216 fs: Arc<dyn Fs>,
217 kernel_specification: KernelSpecification,
218 cx: &mut ViewContext<Self>,
219 ) -> Self {
220 let entity_id = editor.entity_id();
221
222 let kernel = RunningKernel::new(
223 kernel_specification.clone(),
224 entity_id,
225 Self::working_directory(editor.clone(), cx),
226 fs.clone(),
227 cx,
228 );
229
230 let pending_kernel = cx
231 .spawn(|this, mut cx| async move {
232 let kernel = kernel.await;
233
234 match kernel {
235 Ok((mut kernel, mut messages_rx)) => {
236 this.update(&mut cx, |this, cx| {
237 // At this point we can create a new kind of kernel that has the process and our long running background tasks
238
239 let status = kernel.process.status();
240 this.kernel = Kernel::RunningKernel(kernel);
241
242 cx.spawn(|session, mut cx| async move {
243 let error_message = match status.await {
244 Ok(status) => {
245 if status.success() {
246 log::info!("kernel process exited successfully");
247 return;
248 }
249
250 format!("kernel process exited with status: {:?}", status)
251 }
252 Err(err) => {
253 format!("kernel process exited with error: {:?}", err)
254 }
255 };
256
257 log::error!("{}", error_message);
258
259 session
260 .update(&mut cx, |session, cx| {
261 session.kernel =
262 Kernel::ErroredLaunch(error_message.clone());
263
264 session.blocks.values().for_each(|block| {
265 block.execution_view.update(
266 cx,
267 |execution_view, cx| {
268 match execution_view.status {
269 ExecutionStatus::Finished => {
270 // Do nothing when the output was good
271 }
272 _ => {
273 // All other cases, set the status to errored
274 execution_view.status =
275 ExecutionStatus::KernelErrored(
276 error_message.clone(),
277 )
278 }
279 }
280 cx.notify();
281 },
282 );
283 });
284
285 cx.notify();
286 })
287 .ok();
288 })
289 .detach();
290
291 this.messaging_task = cx.spawn(|session, mut cx| async move {
292 while let Some(message) = messages_rx.next().await {
293 session
294 .update(&mut cx, |session, cx| {
295 session.route(&message, cx);
296 })
297 .ok();
298 }
299 });
300 })
301 .ok();
302 }
303 Err(err) => {
304 this.update(&mut cx, |this, _cx| {
305 this.kernel = Kernel::ErroredLaunch(err.to_string());
306 })
307 .ok();
308 }
309 }
310 })
311 .shared();
312
313 let subscription = match editor.upgrade() {
314 Some(editor) => {
315 let buffer = editor.read(cx).buffer().clone();
316 cx.subscribe(&buffer, Self::on_buffer_event)
317 }
318 None => Subscription::new(|| {}),
319 };
320
321 return Self {
322 editor,
323 kernel: Kernel::StartingKernel(pending_kernel),
324 messaging_task: Task::ready(()),
325 blocks: HashMap::default(),
326 kernel_specification,
327 _buffer_subscription: subscription,
328 };
329 }
330
331 fn on_buffer_event(
332 &mut self,
333 buffer: Model<MultiBuffer>,
334 event: &multi_buffer::Event,
335 cx: &mut ViewContext<Self>,
336 ) {
337 if let multi_buffer::Event::Edited { .. } = event {
338 let snapshot = buffer.read(cx).snapshot(cx);
339
340 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
341
342 self.blocks.retain(|_id, block| {
343 if block.invalidation_anchor.is_valid(&snapshot) {
344 true
345 } else {
346 blocks_to_remove.insert(block.block_id);
347 false
348 }
349 });
350
351 if !blocks_to_remove.is_empty() {
352 self.editor
353 .update(cx, |editor, cx| {
354 editor.remove_blocks(blocks_to_remove, None, cx);
355 })
356 .ok();
357 cx.notify();
358 }
359 }
360 }
361
362 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
363 match &mut self.kernel {
364 Kernel::RunningKernel(kernel) => {
365 kernel.request_tx.try_send(message).ok();
366 }
367 _ => {}
368 }
369
370 anyhow::Ok(())
371 }
372
373 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
374 let blocks_to_remove: HashSet<CustomBlockId> =
375 self.blocks.values().map(|block| block.block_id).collect();
376
377 self.editor
378 .update(cx, |editor, cx| {
379 editor.remove_blocks(blocks_to_remove, None, cx);
380 })
381 .ok();
382
383 self.blocks.clear();
384 }
385
386 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
387 let editor = if let Some(editor) = self.editor.upgrade() {
388 editor
389 } else {
390 return;
391 };
392
393 if code.is_empty() {
394 return;
395 }
396
397 let execute_request = ExecuteRequest {
398 code: code.to_string(),
399 ..ExecuteRequest::default()
400 };
401
402 let message: JupyterMessage = execute_request.into();
403
404 let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
405
406 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
407
408 self.blocks.retain(|_key, block| {
409 if anchor_range.overlaps(&block.code_range, &buffer) {
410 blocks_to_remove.insert(block.block_id);
411 false
412 } else {
413 true
414 }
415 });
416
417 self.editor
418 .update(cx, |editor, cx| {
419 editor.remove_blocks(blocks_to_remove, None, cx);
420 })
421 .ok();
422
423 let status = match &self.kernel {
424 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
425 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
426 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
427 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
428 Kernel::Shutdown => ExecutionStatus::Shutdown,
429 };
430
431 let parent_message_id = message.header.msg_id.clone();
432 let session_view = cx.view().downgrade();
433 let weak_editor = self.editor.clone();
434
435 let on_close: CloseBlockFn =
436 Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
437 if let Some(session) = session_view.upgrade() {
438 session.update(cx, |session, cx| {
439 session.blocks.remove(&parent_message_id);
440 cx.notify();
441 });
442 }
443
444 if let Some(editor) = weak_editor.upgrade() {
445 editor.update(cx, |editor, cx| {
446 let mut block_ids = HashSet::default();
447 block_ids.insert(block_id);
448 editor.remove_blocks(block_ids, None, cx);
449 });
450 }
451 });
452
453 let editor_block = if let Ok(editor_block) =
454 EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
455 {
456 editor_block
457 } else {
458 return;
459 };
460
461 let new_cursor_pos = editor_block.invalidation_anchor;
462
463 self.blocks
464 .insert(message.header.msg_id.clone(), editor_block);
465
466 match &self.kernel {
467 Kernel::RunningKernel(_) => {
468 self.send(message, cx).ok();
469 }
470 Kernel::StartingKernel(task) => {
471 // Queue up the execution as a task to run after the kernel starts
472 let task = task.clone();
473 let message = message.clone();
474
475 cx.spawn(|this, mut cx| async move {
476 task.await;
477 this.update(&mut cx, |this, cx| {
478 this.send(message, cx).ok();
479 })
480 .ok();
481 })
482 .detach();
483 }
484 _ => {}
485 }
486
487 // Now move the cursor to after the block
488 editor.update(cx, move |editor, cx| {
489 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
490 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
491 });
492 });
493 }
494
495 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
496 let parent_message_id = match message.parent_header.as_ref() {
497 Some(header) => &header.msg_id,
498 None => return,
499 };
500
501 match &message.content {
502 JupyterMessageContent::Status(status) => {
503 self.kernel.set_execution_state(&status.execution_state);
504 cx.notify();
505 }
506 JupyterMessageContent::KernelInfoReply(reply) => {
507 self.kernel.set_kernel_info(&reply);
508 cx.notify();
509 }
510 _ => {}
511 }
512
513 if let Some(block) = self.blocks.get_mut(parent_message_id) {
514 block.handle_message(&message, cx);
515 return;
516 }
517 }
518
519 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
520 match &mut self.kernel {
521 Kernel::RunningKernel(_kernel) => {
522 self.send(InterruptRequest {}.into(), cx).ok();
523 }
524 Kernel::StartingKernel(_task) => {
525 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
526 }
527 _ => {}
528 }
529 }
530
531 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
532 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
533
534 match kernel {
535 Kernel::RunningKernel(mut kernel) => {
536 let mut request_tx = kernel.request_tx.clone();
537
538 cx.spawn(|this, mut cx| async move {
539 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
540 request_tx.try_send(message).ok();
541
542 // Give the kernel a bit of time to clean up
543 cx.background_executor().timer(Duration::from_secs(3)).await;
544
545 kernel.process.kill().ok();
546
547 this.update(&mut cx, |this, cx| {
548 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
549 this.clear_outputs(cx);
550 this.kernel = Kernel::Shutdown;
551 cx.notify();
552 })
553 .ok();
554 })
555 .detach();
556 }
557 Kernel::StartingKernel(_kernel) => {
558 self.kernel = Kernel::Shutdown;
559 }
560 _ => {
561 self.kernel = Kernel::Shutdown;
562 }
563 }
564 cx.notify();
565 }
566}
567
568pub enum SessionEvent {
569 Shutdown(WeakView<Editor>),
570}
571
572impl EventEmitter<SessionEvent> for Session {}
573
574impl Render for Session {
575 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
576 let mut buttons = vec![];
577
578 buttons.push(
579 ButtonLike::new("shutdown")
580 .child(Label::new("Shutdown"))
581 .style(ButtonStyle::Subtle)
582 .on_click(cx.listener(move |session, _, cx| {
583 session.shutdown(cx);
584 })),
585 );
586
587 let status_text = match &self.kernel {
588 Kernel::RunningKernel(kernel) => {
589 buttons.push(
590 ButtonLike::new("interrupt")
591 .child(Label::new("Interrupt"))
592 .style(ButtonStyle::Subtle)
593 .on_click(cx.listener(move |session, _, cx| {
594 session.interrupt(cx);
595 })),
596 );
597 let mut name = self.kernel_specification.name.clone();
598
599 if let Some(info) = &kernel.kernel_info {
600 name.push_str(" (");
601 name.push_str(&info.language_info.name);
602 name.push_str(")");
603 }
604 name
605 }
606 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
607 Kernel::ErroredLaunch(err) => {
608 format!("{} (Error: {})", self.kernel_specification.name, err)
609 }
610 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
611 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
612 };
613
614 return v_flex()
615 .gap_1()
616 .child(
617 h_flex()
618 .gap_2()
619 .child(self.kernel.dot())
620 .child(Label::new(status_text)),
621 )
622 .child(h_flex().gap_2().children(buttons));
623 }
624}