1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
9 },
10 scroll::Autoscroll,
11 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
12};
13use futures::{FutureExt as _, StreamExt as _};
14use gpui::{
15 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
16};
17use language::Point;
18use project::Fs;
19use runtimelib::{
20 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
21};
22use settings::Settings as _;
23use std::{env::temp_dir, ops::Range, path::PathBuf, sync::Arc, time::Duration};
24use theme::{ActiveTheme, ThemeSettings};
25use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
26
27pub struct Session {
28 pub editor: WeakView<Editor>,
29 pub kernel: Kernel,
30 blocks: HashMap<String, EditorBlock>,
31 pub messaging_task: Task<()>,
32 pub kernel_specification: KernelSpecification,
33 _buffer_subscription: Subscription,
34}
35
36struct EditorBlock {
37 editor: WeakView<Editor>,
38 code_range: Range<Anchor>,
39 invalidation_anchor: Anchor,
40 block_id: BlockId,
41 execution_view: View<ExecutionView>,
42}
43
44impl EditorBlock {
45 fn new(
46 editor: WeakView<Editor>,
47 code_range: Range<Anchor>,
48 status: ExecutionStatus,
49 cx: &mut ViewContext<Session>,
50 ) -> anyhow::Result<Self> {
51 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
52
53 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
54 let buffer = editor.buffer().clone();
55 let buffer_snapshot = buffer.read(cx).snapshot(cx);
56 let end_point = code_range.end.to_point(&buffer_snapshot);
57 let next_row_start = end_point + Point::new(1, 0);
58 if next_row_start > buffer_snapshot.max_point() {
59 buffer.update(cx, |buffer, cx| {
60 buffer.edit(
61 [(
62 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
63 "\n",
64 )],
65 None,
66 cx,
67 )
68 });
69 }
70
71 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
72 let block = BlockProperties {
73 position: code_range.end,
74 height: execution_view.num_lines(cx).saturating_add(1),
75 style: BlockStyle::Sticky,
76 render: Self::create_output_area_render(execution_view.clone()),
77 disposition: BlockDisposition::Below,
78 };
79
80 let block_id = editor.insert_blocks([block], None, cx)[0];
81 (block_id, invalidation_anchor)
82 })?;
83
84 anyhow::Ok(Self {
85 editor,
86 code_range,
87 invalidation_anchor,
88 block_id,
89 execution_view,
90 })
91 }
92
93 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
94 self.execution_view.update(cx, |execution_view, cx| {
95 execution_view.push_message(&message.content, cx);
96 });
97
98 self.editor
99 .update(cx, |editor, cx| {
100 let mut replacements = HashMap::default();
101 replacements.insert(
102 self.block_id,
103 (
104 Some(self.execution_view.num_lines(cx).saturating_add(1)),
105 Self::create_output_area_render(self.execution_view.clone()),
106 ),
107 );
108 editor.replace_blocks(replacements, None, cx);
109 })
110 .ok();
111 }
112
113 fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
114 let render = move |cx: &mut BlockContext| {
115 let execution_view = execution_view.clone();
116 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
117 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
118 // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
119
120 let gutter_width = cx.gutter_dimensions.width;
121
122 h_flex()
123 .w_full()
124 .bg(cx.theme().colors().background)
125 .border_y_1()
126 .border_color(cx.theme().colors().border)
127 .pl(gutter_width)
128 .child(
129 div()
130 .text_size(text_font_size)
131 .font_family(text_font)
132 // .ml(gutter_width)
133 .mx_1()
134 .my_2()
135 .h_full()
136 .w_full()
137 .mr(gutter_width)
138 .child(execution_view),
139 )
140 .into_any_element()
141 };
142
143 Box::new(render)
144 }
145}
146
147impl Session {
148 pub fn working_directory(editor: WeakView<Editor>, cx: &WindowContext) -> PathBuf {
149 if let Some(working_directory) = editor
150 .upgrade()
151 .and_then(|editor| editor.read(cx).working_directory(cx))
152 {
153 working_directory
154 } else {
155 temp_dir()
156 }
157 }
158
159 pub fn new(
160 editor: WeakView<Editor>,
161 fs: Arc<dyn Fs>,
162 kernel_specification: KernelSpecification,
163 cx: &mut ViewContext<Self>,
164 ) -> Self {
165 let entity_id = editor.entity_id();
166
167 let kernel = RunningKernel::new(
168 kernel_specification.clone(),
169 entity_id,
170 Self::working_directory(editor.clone(), cx),
171 fs.clone(),
172 cx,
173 );
174
175 let pending_kernel = cx
176 .spawn(|this, mut cx| async move {
177 let kernel = kernel.await;
178
179 match kernel {
180 Ok((mut kernel, mut messages_rx)) => {
181 this.update(&mut cx, |this, cx| {
182 // At this point we can create a new kind of kernel that has the process and our long running background tasks
183
184 let status = kernel.process.status();
185 this.kernel = Kernel::RunningKernel(kernel);
186
187 cx.spawn(|session, mut cx| async move {
188 let error_message = match status.await {
189 Ok(status) => {
190 if status.success() {
191 log::info!("kernel process exited successfully");
192 return;
193 }
194
195 format!("kernel process exited with status: {:?}", status)
196 }
197 Err(err) => {
198 format!("kernel process exited with error: {:?}", err)
199 }
200 };
201
202 log::error!("{}", error_message);
203
204 session
205 .update(&mut cx, |session, cx| {
206 session.kernel =
207 Kernel::ErroredLaunch(error_message.clone());
208
209 session.blocks.values().for_each(|block| {
210 block.execution_view.update(
211 cx,
212 |execution_view, cx| {
213 match execution_view.status {
214 ExecutionStatus::Finished => {
215 // Do nothing when the output was good
216 }
217 _ => {
218 // All other cases, set the status to errored
219 execution_view.status =
220 ExecutionStatus::KernelErrored(
221 error_message.clone(),
222 )
223 }
224 }
225 cx.notify();
226 },
227 );
228 });
229
230 cx.notify();
231 })
232 .ok();
233 })
234 .detach();
235
236 this.messaging_task = cx.spawn(|session, mut cx| async move {
237 while let Some(message) = messages_rx.next().await {
238 session
239 .update(&mut cx, |session, cx| {
240 session.route(&message, cx);
241 })
242 .ok();
243 }
244 });
245 })
246 .ok();
247 }
248 Err(err) => {
249 this.update(&mut cx, |this, _cx| {
250 this.kernel = Kernel::ErroredLaunch(err.to_string());
251 })
252 .ok();
253 }
254 }
255 })
256 .shared();
257
258 let subscription = match editor.upgrade() {
259 Some(editor) => {
260 let buffer = editor.read(cx).buffer().clone();
261 cx.subscribe(&buffer, Self::on_buffer_event)
262 }
263 None => Subscription::new(|| {}),
264 };
265
266 return Self {
267 editor,
268 kernel: Kernel::StartingKernel(pending_kernel),
269 messaging_task: Task::ready(()),
270 blocks: HashMap::default(),
271 kernel_specification,
272 _buffer_subscription: subscription,
273 };
274 }
275
276 fn on_buffer_event(
277 &mut self,
278 buffer: Model<MultiBuffer>,
279 event: &multi_buffer::Event,
280 cx: &mut ViewContext<Self>,
281 ) {
282 if let multi_buffer::Event::Edited { .. } = event {
283 let snapshot = buffer.read(cx).snapshot(cx);
284
285 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
286
287 self.blocks.retain(|_id, block| {
288 if block.invalidation_anchor.is_valid(&snapshot) {
289 true
290 } else {
291 blocks_to_remove.insert(block.block_id);
292 false
293 }
294 });
295
296 if !blocks_to_remove.is_empty() {
297 self.editor
298 .update(cx, |editor, cx| {
299 editor.remove_blocks(blocks_to_remove, None, cx);
300 })
301 .ok();
302 cx.notify();
303 }
304 }
305 }
306
307 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
308 match &mut self.kernel {
309 Kernel::RunningKernel(kernel) => {
310 kernel.request_tx.try_send(message).ok();
311 }
312 _ => {}
313 }
314
315 anyhow::Ok(())
316 }
317
318 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
319 let blocks_to_remove: HashSet<BlockId> =
320 self.blocks.values().map(|block| block.block_id).collect();
321
322 self.editor
323 .update(cx, |editor, cx| {
324 editor.remove_blocks(blocks_to_remove, None, cx);
325 })
326 .ok();
327
328 self.blocks.clear();
329 }
330
331 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
332 let editor = if let Some(editor) = self.editor.upgrade() {
333 editor
334 } else {
335 return;
336 };
337
338 if code.is_empty() {
339 return;
340 }
341
342 let execute_request = ExecuteRequest {
343 code: code.to_string(),
344 ..ExecuteRequest::default()
345 };
346
347 let message: JupyterMessage = execute_request.into();
348
349 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
350
351 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
352
353 self.blocks.retain(|_key, block| {
354 if anchor_range.overlaps(&block.code_range, &buffer) {
355 blocks_to_remove.insert(block.block_id);
356 false
357 } else {
358 true
359 }
360 });
361
362 self.editor
363 .update(cx, |editor, cx| {
364 editor.remove_blocks(blocks_to_remove, None, cx);
365 })
366 .ok();
367
368 let status = match &self.kernel {
369 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
370 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
371 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
372 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
373 Kernel::Shutdown => ExecutionStatus::Shutdown,
374 };
375
376 let editor_block = if let Ok(editor_block) =
377 EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
378 {
379 editor_block
380 } else {
381 return;
382 };
383
384 let new_cursor_pos = editor_block.invalidation_anchor;
385
386 self.blocks
387 .insert(message.header.msg_id.clone(), editor_block);
388
389 match &self.kernel {
390 Kernel::RunningKernel(_) => {
391 self.send(message, cx).ok();
392 }
393 Kernel::StartingKernel(task) => {
394 // Queue up the execution as a task to run after the kernel starts
395 let task = task.clone();
396 let message = message.clone();
397
398 cx.spawn(|this, mut cx| async move {
399 task.await;
400 this.update(&mut cx, |this, cx| {
401 this.send(message, cx).ok();
402 })
403 .ok();
404 })
405 .detach();
406 }
407 _ => {}
408 }
409
410 // Now move the cursor to after the block
411 editor.update(cx, move |editor, cx| {
412 editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
413 selections.select_ranges([new_cursor_pos..new_cursor_pos]);
414 });
415 });
416 }
417
418 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
419 let parent_message_id = match message.parent_header.as_ref() {
420 Some(header) => &header.msg_id,
421 None => return,
422 };
423
424 match &message.content {
425 JupyterMessageContent::Status(status) => {
426 self.kernel.set_execution_state(&status.execution_state);
427 cx.notify();
428 }
429 JupyterMessageContent::KernelInfoReply(reply) => {
430 self.kernel.set_kernel_info(&reply);
431 cx.notify();
432 }
433 _ => {}
434 }
435
436 if let Some(block) = self.blocks.get_mut(parent_message_id) {
437 block.handle_message(&message, cx);
438 return;
439 }
440 }
441
442 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
443 match &mut self.kernel {
444 Kernel::RunningKernel(_kernel) => {
445 self.send(InterruptRequest {}.into(), cx).ok();
446 }
447 Kernel::StartingKernel(_task) => {
448 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
449 }
450 _ => {}
451 }
452 }
453
454 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
455 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
456
457 match kernel {
458 Kernel::RunningKernel(mut kernel) => {
459 let mut request_tx = kernel.request_tx.clone();
460
461 cx.spawn(|this, mut cx| async move {
462 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
463 request_tx.try_send(message).ok();
464
465 // Give the kernel a bit of time to clean up
466 cx.background_executor().timer(Duration::from_secs(3)).await;
467
468 kernel.process.kill().ok();
469
470 this.update(&mut cx, |this, cx| {
471 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
472 this.clear_outputs(cx);
473 this.kernel = Kernel::Shutdown;
474 cx.notify();
475 })
476 .ok();
477 })
478 .detach();
479 }
480 Kernel::StartingKernel(_kernel) => {
481 self.kernel = Kernel::Shutdown;
482 }
483 _ => {
484 self.kernel = Kernel::Shutdown;
485 }
486 }
487 cx.notify();
488 }
489}
490
491pub enum SessionEvent {
492 Shutdown(WeakView<Editor>),
493}
494
495impl EventEmitter<SessionEvent> for Session {}
496
497impl Render for Session {
498 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
499 let mut buttons = vec![];
500
501 buttons.push(
502 ButtonLike::new("shutdown")
503 .child(Label::new("Shutdown"))
504 .style(ButtonStyle::Subtle)
505 .on_click(cx.listener(move |session, _, cx| {
506 session.shutdown(cx);
507 })),
508 );
509
510 let status_text = match &self.kernel {
511 Kernel::RunningKernel(kernel) => {
512 buttons.push(
513 ButtonLike::new("interrupt")
514 .child(Label::new("Interrupt"))
515 .style(ButtonStyle::Subtle)
516 .on_click(cx.listener(move |session, _, cx| {
517 session.interrupt(cx);
518 })),
519 );
520 let mut name = self.kernel_specification.name.clone();
521
522 if let Some(info) = &kernel.kernel_info {
523 name.push_str(" (");
524 name.push_str(&info.language_info.name);
525 name.push_str(")");
526 }
527 name
528 }
529 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
530 Kernel::ErroredLaunch(err) => {
531 format!("{} (Error: {})", self.kernel_specification.name, err)
532 }
533 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
534 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
535 };
536
537 return v_flex()
538 .gap_1()
539 .child(
540 h_flex()
541 .gap_2()
542 .child(self.kernel.dot())
543 .child(Label::new(status_text)),
544 )
545 .child(h_flex().gap_2().children(buttons));
546 }
547}