1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
9 },
10 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
11};
12use futures::{FutureExt as _, StreamExt as _};
13use gpui::{
14 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
15};
16use language::Point;
17use project::Fs;
18use runtimelib::{
19 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, KernelInfoRequest,
20 ShutdownRequest,
21};
22use settings::Settings as _;
23use std::{ops::Range, sync::Arc, time::Duration};
24use theme::{ActiveTheme, ThemeSettings};
25use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
26
27pub struct Session {
28 pub editor: WeakView<Editor>,
29 pub kernel: Kernel,
30 blocks: HashMap<String, EditorBlock>,
31 pub messaging_task: Task<()>,
32 pub kernel_specification: KernelSpecification,
33 _buffer_subscription: Subscription,
34}
35
36struct EditorBlock {
37 editor: WeakView<Editor>,
38 code_range: Range<Anchor>,
39 invalidation_anchor: Anchor,
40 block_id: BlockId,
41 execution_view: View<ExecutionView>,
42}
43
44impl EditorBlock {
45 fn new(
46 editor: WeakView<Editor>,
47 code_range: Range<Anchor>,
48 status: ExecutionStatus,
49 cx: &mut ViewContext<Session>,
50 ) -> anyhow::Result<Self> {
51 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
52
53 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
54 let buffer = editor.buffer().clone();
55 let buffer_snapshot = buffer.read(cx).snapshot(cx);
56 let end_point = code_range.end.to_point(&buffer_snapshot);
57 let next_row_start = end_point + Point::new(1, 0);
58 if next_row_start > buffer_snapshot.max_point() {
59 buffer.update(cx, |buffer, cx| {
60 buffer.edit(
61 [(
62 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
63 "\n",
64 )],
65 None,
66 cx,
67 )
68 });
69 }
70
71 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
72 let block = BlockProperties {
73 position: code_range.end,
74 height: execution_view.num_lines(cx).saturating_add(1),
75 style: BlockStyle::Sticky,
76 render: Self::create_output_area_render(execution_view.clone()),
77 disposition: BlockDisposition::Below,
78 };
79
80 let block_id = editor.insert_blocks([block], None, cx)[0];
81 (block_id, invalidation_anchor)
82 })?;
83
84 anyhow::Ok(Self {
85 editor,
86 code_range,
87 invalidation_anchor,
88 block_id,
89 execution_view,
90 })
91 }
92
93 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
94 self.execution_view.update(cx, |execution_view, cx| {
95 execution_view.push_message(&message.content, cx);
96 });
97
98 self.editor
99 .update(cx, |editor, cx| {
100 let mut replacements = HashMap::default();
101 replacements.insert(
102 self.block_id,
103 (
104 Some(self.execution_view.num_lines(cx).saturating_add(1)),
105 Self::create_output_area_render(self.execution_view.clone()),
106 ),
107 );
108 editor.replace_blocks(replacements, None, cx);
109 })
110 .ok();
111 }
112
113 fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
114 let render = move |cx: &mut BlockContext| {
115 let execution_view = execution_view.clone();
116 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
117 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
118 // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
119
120 let gutter_width = cx.gutter_dimensions.width;
121
122 h_flex()
123 .w_full()
124 .bg(cx.theme().colors().background)
125 .border_y_1()
126 .border_color(cx.theme().colors().border)
127 .pl(gutter_width)
128 .child(
129 div()
130 .text_size(text_font_size)
131 .font_family(text_font)
132 // .ml(gutter_width)
133 .mx_1()
134 .my_2()
135 .h_full()
136 .w_full()
137 .mr(gutter_width)
138 .child(execution_view),
139 )
140 .into_any_element()
141 };
142
143 Box::new(render)
144 }
145}
146
147impl Session {
148 pub fn new(
149 editor: WeakView<Editor>,
150 fs: Arc<dyn Fs>,
151 kernel_specification: KernelSpecification,
152 cx: &mut ViewContext<Self>,
153 ) -> Self {
154 let entity_id = editor.entity_id();
155 let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
156
157 let pending_kernel = cx
158 .spawn(|this, mut cx| async move {
159 let kernel = kernel.await;
160
161 match kernel {
162 Ok((kernel, mut messages_rx)) => {
163 this.update(&mut cx, |this, cx| {
164 // At this point we can create a new kind of kernel that has the process and our long running background tasks
165 this.kernel = Kernel::RunningKernel(kernel);
166
167 this.messaging_task = cx.spawn(|session, mut cx| async move {
168 while let Some(message) = messages_rx.next().await {
169 session
170 .update(&mut cx, |session, cx| {
171 session.route(&message, cx);
172 })
173 .ok();
174 }
175 });
176
177 // For some reason sending a kernel info request will brick the ark (R) kernel.
178 // Note that Deno and Python do not have this issue.
179 if this.kernel_specification.name == "ark" {
180 return;
181 }
182
183 // Get kernel info after (possibly) letting the kernel start
184 cx.spawn(|this, mut cx| async move {
185 cx.background_executor()
186 .timer(Duration::from_millis(120))
187 .await;
188 this.update(&mut cx, |this, _cx| {
189 this.send(KernelInfoRequest {}.into(), _cx).ok();
190 })
191 .ok();
192 })
193 .detach();
194 })
195 .ok();
196 }
197 Err(err) => {
198 this.update(&mut cx, |this, _cx| {
199 this.kernel = Kernel::ErroredLaunch(err.to_string());
200 })
201 .ok();
202 }
203 }
204 })
205 .shared();
206
207 let subscription = match editor.upgrade() {
208 Some(editor) => {
209 let buffer = editor.read(cx).buffer().clone();
210 cx.subscribe(&buffer, Self::on_buffer_event)
211 }
212 None => Subscription::new(|| {}),
213 };
214
215 return Self {
216 editor,
217 kernel: Kernel::StartingKernel(pending_kernel),
218 messaging_task: Task::ready(()),
219 blocks: HashMap::default(),
220 kernel_specification,
221 _buffer_subscription: subscription,
222 };
223 }
224
225 fn on_buffer_event(
226 &mut self,
227 buffer: Model<MultiBuffer>,
228 event: &multi_buffer::Event,
229 cx: &mut ViewContext<Self>,
230 ) {
231 if let multi_buffer::Event::Edited { .. } = event {
232 let snapshot = buffer.read(cx).snapshot(cx);
233
234 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
235
236 self.blocks.retain(|_id, block| {
237 if block.invalidation_anchor.is_valid(&snapshot) {
238 true
239 } else {
240 blocks_to_remove.insert(block.block_id);
241 false
242 }
243 });
244
245 if !blocks_to_remove.is_empty() {
246 self.editor
247 .update(cx, |editor, cx| {
248 editor.remove_blocks(blocks_to_remove, None, cx);
249 })
250 .ok();
251 cx.notify();
252 }
253 }
254 }
255
256 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
257 match &mut self.kernel {
258 Kernel::RunningKernel(kernel) => {
259 kernel.request_tx.try_send(message).ok();
260 }
261 _ => {}
262 }
263
264 anyhow::Ok(())
265 }
266
267 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
268 let blocks_to_remove: HashSet<BlockId> =
269 self.blocks.values().map(|block| block.block_id).collect();
270
271 self.editor
272 .update(cx, |editor, cx| {
273 editor.remove_blocks(blocks_to_remove, None, cx);
274 })
275 .ok();
276
277 self.blocks.clear();
278 }
279
280 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
281 let editor = if let Some(editor) = self.editor.upgrade() {
282 editor
283 } else {
284 return;
285 };
286
287 let execute_request = ExecuteRequest {
288 code: code.to_string(),
289 ..ExecuteRequest::default()
290 };
291
292 let message: JupyterMessage = execute_request.into();
293
294 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
295
296 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
297
298 self.blocks.retain(|_key, block| {
299 if anchor_range.overlaps(&block.code_range, &buffer) {
300 blocks_to_remove.insert(block.block_id);
301 false
302 } else {
303 true
304 }
305 });
306
307 self.editor
308 .update(cx, |editor, cx| {
309 editor.remove_blocks(blocks_to_remove, None, cx);
310 })
311 .ok();
312
313 let status = match &self.kernel {
314 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
315 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
316 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
317 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
318 Kernel::Shutdown => ExecutionStatus::Shutdown,
319 };
320
321 let editor_block = if let Ok(editor_block) =
322 EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
323 {
324 editor_block
325 } else {
326 return;
327 };
328
329 self.blocks
330 .insert(message.header.msg_id.clone(), editor_block);
331
332 match &self.kernel {
333 Kernel::RunningKernel(_) => {
334 self.send(message, cx).ok();
335 }
336 Kernel::StartingKernel(task) => {
337 // Queue up the execution as a task to run after the kernel starts
338 let task = task.clone();
339 let message = message.clone();
340
341 cx.spawn(|this, mut cx| async move {
342 task.await;
343 this.update(&mut cx, |this, cx| {
344 this.send(message, cx).ok();
345 })
346 .ok();
347 })
348 .detach();
349 }
350 _ => {}
351 }
352 }
353
354 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
355 let parent_message_id = match message.parent_header.as_ref() {
356 Some(header) => &header.msg_id,
357 None => return,
358 };
359
360 match &message.content {
361 JupyterMessageContent::Status(status) => {
362 self.kernel.set_execution_state(&status.execution_state);
363 cx.notify();
364 }
365 JupyterMessageContent::KernelInfoReply(reply) => {
366 self.kernel.set_kernel_info(&reply);
367 cx.notify();
368 }
369 _ => {}
370 }
371
372 if let Some(block) = self.blocks.get_mut(parent_message_id) {
373 block.handle_message(&message, cx);
374 return;
375 }
376 }
377
378 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
379 match &mut self.kernel {
380 Kernel::RunningKernel(_kernel) => {
381 self.send(InterruptRequest {}.into(), cx).ok();
382 }
383 Kernel::StartingKernel(_task) => {
384 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
385 }
386 _ => {}
387 }
388 }
389
390 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
391 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
392
393 match kernel {
394 Kernel::RunningKernel(mut kernel) => {
395 let mut request_tx = kernel.request_tx.clone();
396
397 cx.spawn(|this, mut cx| async move {
398 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
399 request_tx.try_send(message).ok();
400
401 // Give the kernel a bit of time to clean up
402 cx.background_executor().timer(Duration::from_secs(3)).await;
403
404 kernel.process.kill().ok();
405
406 this.update(&mut cx, |this, cx| {
407 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
408 this.clear_outputs(cx);
409 this.kernel = Kernel::Shutdown;
410 cx.notify();
411 })
412 .ok();
413 })
414 .detach();
415 }
416 Kernel::StartingKernel(_kernel) => {
417 self.kernel = Kernel::Shutdown;
418 }
419 _ => {
420 self.kernel = Kernel::Shutdown;
421 }
422 }
423 cx.notify();
424 }
425}
426
427pub enum SessionEvent {
428 Shutdown(WeakView<Editor>),
429}
430
431impl EventEmitter<SessionEvent> for Session {}
432
433impl Render for Session {
434 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
435 let mut buttons = vec![];
436
437 buttons.push(
438 ButtonLike::new("shutdown")
439 .child(Label::new("Shutdown"))
440 .style(ButtonStyle::Subtle)
441 .on_click(cx.listener(move |session, _, cx| {
442 session.shutdown(cx);
443 })),
444 );
445
446 let status_text = match &self.kernel {
447 Kernel::RunningKernel(kernel) => {
448 buttons.push(
449 ButtonLike::new("interrupt")
450 .child(Label::new("Interrupt"))
451 .style(ButtonStyle::Subtle)
452 .on_click(cx.listener(move |session, _, cx| {
453 session.interrupt(cx);
454 })),
455 );
456 let mut name = self.kernel_specification.name.clone();
457
458 if let Some(info) = &kernel.kernel_info {
459 name.push_str(" (");
460 name.push_str(&info.language_info.name);
461 name.push_str(")");
462 }
463 name
464 }
465 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
466 Kernel::ErroredLaunch(err) => {
467 format!("{} (Error: {})", self.kernel_specification.name, err)
468 }
469 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
470 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
471 };
472
473 return v_flex()
474 .gap_1()
475 .child(
476 h_flex()
477 .gap_2()
478 .child(self.kernel.dot())
479 .child(Label::new(status_text)),
480 )
481 .child(h_flex().gap_2().children(buttons));
482 }
483}