1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
9 },
10 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
11};
12use futures::{FutureExt as _, StreamExt as _};
13use gpui::{
14 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
15};
16use language::Point;
17use project::Fs;
18use runtimelib::{
19 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
20};
21use settings::Settings as _;
22use std::{ops::Range, sync::Arc, time::Duration};
23use theme::{ActiveTheme, ThemeSettings};
24use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
25
26pub struct Session {
27 pub editor: WeakView<Editor>,
28 pub kernel: Kernel,
29 blocks: HashMap<String, EditorBlock>,
30 pub messaging_task: Task<()>,
31 pub kernel_specification: KernelSpecification,
32 _buffer_subscription: Subscription,
33}
34
35struct EditorBlock {
36 editor: WeakView<Editor>,
37 code_range: Range<Anchor>,
38 invalidation_anchor: Anchor,
39 block_id: BlockId,
40 execution_view: View<ExecutionView>,
41}
42
43impl EditorBlock {
44 fn new(
45 editor: WeakView<Editor>,
46 code_range: Range<Anchor>,
47 status: ExecutionStatus,
48 cx: &mut ViewContext<Session>,
49 ) -> anyhow::Result<Self> {
50 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
51
52 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
53 let buffer = editor.buffer().clone();
54 let buffer_snapshot = buffer.read(cx).snapshot(cx);
55 let end_point = code_range.end.to_point(&buffer_snapshot);
56 let next_row_start = end_point + Point::new(1, 0);
57 if next_row_start > buffer_snapshot.max_point() {
58 buffer.update(cx, |buffer, cx| {
59 buffer.edit(
60 [(
61 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
62 "\n",
63 )],
64 None,
65 cx,
66 )
67 });
68 }
69
70 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
71 let block = BlockProperties {
72 position: code_range.end,
73 height: execution_view.num_lines(cx).saturating_add(1),
74 style: BlockStyle::Sticky,
75 render: Self::create_output_area_render(execution_view.clone()),
76 disposition: BlockDisposition::Below,
77 };
78
79 let block_id = editor.insert_blocks([block], None, cx)[0];
80 (block_id, invalidation_anchor)
81 })?;
82
83 anyhow::Ok(Self {
84 editor,
85 code_range,
86 invalidation_anchor,
87 block_id,
88 execution_view,
89 })
90 }
91
92 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
93 self.execution_view.update(cx, |execution_view, cx| {
94 execution_view.push_message(&message.content, cx);
95 });
96
97 self.editor
98 .update(cx, |editor, cx| {
99 let mut replacements = HashMap::default();
100 replacements.insert(
101 self.block_id,
102 (
103 Some(self.execution_view.num_lines(cx).saturating_add(1)),
104 Self::create_output_area_render(self.execution_view.clone()),
105 ),
106 );
107 editor.replace_blocks(replacements, None, cx);
108 })
109 .ok();
110 }
111
112 fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
113 let render = move |cx: &mut BlockContext| {
114 let execution_view = execution_view.clone();
115 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
116 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
117 // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
118
119 let gutter_width = cx.gutter_dimensions.width;
120
121 h_flex()
122 .w_full()
123 .bg(cx.theme().colors().background)
124 .border_y_1()
125 .border_color(cx.theme().colors().border)
126 .pl(gutter_width)
127 .child(
128 div()
129 .text_size(text_font_size)
130 .font_family(text_font)
131 // .ml(gutter_width)
132 .mx_1()
133 .my_2()
134 .h_full()
135 .w_full()
136 .mr(gutter_width)
137 .child(execution_view),
138 )
139 .into_any_element()
140 };
141
142 Box::new(render)
143 }
144}
145
146impl Session {
147 pub fn new(
148 editor: WeakView<Editor>,
149 fs: Arc<dyn Fs>,
150 kernel_specification: KernelSpecification,
151 cx: &mut ViewContext<Self>,
152 ) -> Self {
153 let entity_id = editor.entity_id();
154 let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
155
156 let pending_kernel = cx
157 .spawn(|this, mut cx| async move {
158 let kernel = kernel.await;
159
160 match kernel {
161 Ok((kernel, mut messages_rx)) => {
162 this.update(&mut cx, |this, cx| {
163 // At this point we can create a new kind of kernel that has the process and our long running background tasks
164 this.kernel = Kernel::RunningKernel(kernel);
165
166 this.messaging_task = cx.spawn(|session, mut cx| async move {
167 while let Some(message) = messages_rx.next().await {
168 session
169 .update(&mut cx, |session, cx| {
170 session.route(&message, cx);
171 })
172 .ok();
173 }
174 });
175 })
176 .ok();
177 }
178 Err(err) => {
179 this.update(&mut cx, |this, _cx| {
180 this.kernel = Kernel::ErroredLaunch(err.to_string());
181 })
182 .ok();
183 }
184 }
185 })
186 .shared();
187
188 let subscription = match editor.upgrade() {
189 Some(editor) => {
190 let buffer = editor.read(cx).buffer().clone();
191 cx.subscribe(&buffer, Self::on_buffer_event)
192 }
193 None => Subscription::new(|| {}),
194 };
195
196 return Self {
197 editor,
198 kernel: Kernel::StartingKernel(pending_kernel),
199 messaging_task: Task::ready(()),
200 blocks: HashMap::default(),
201 kernel_specification,
202 _buffer_subscription: subscription,
203 };
204 }
205
206 fn on_buffer_event(
207 &mut self,
208 buffer: Model<MultiBuffer>,
209 event: &multi_buffer::Event,
210 cx: &mut ViewContext<Self>,
211 ) {
212 if let multi_buffer::Event::Edited { .. } = event {
213 let snapshot = buffer.read(cx).snapshot(cx);
214
215 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
216
217 self.blocks.retain(|_id, block| {
218 if block.invalidation_anchor.is_valid(&snapshot) {
219 true
220 } else {
221 blocks_to_remove.insert(block.block_id);
222 false
223 }
224 });
225
226 if !blocks_to_remove.is_empty() {
227 self.editor
228 .update(cx, |editor, cx| {
229 editor.remove_blocks(blocks_to_remove, None, cx);
230 })
231 .ok();
232 cx.notify();
233 }
234 }
235 }
236
237 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
238 match &mut self.kernel {
239 Kernel::RunningKernel(kernel) => {
240 kernel.request_tx.try_send(message).ok();
241 }
242 _ => {}
243 }
244
245 anyhow::Ok(())
246 }
247
248 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
249 let blocks_to_remove: HashSet<BlockId> =
250 self.blocks.values().map(|block| block.block_id).collect();
251
252 self.editor
253 .update(cx, |editor, cx| {
254 editor.remove_blocks(blocks_to_remove, None, cx);
255 })
256 .ok();
257
258 self.blocks.clear();
259 }
260
261 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
262 let editor = if let Some(editor) = self.editor.upgrade() {
263 editor
264 } else {
265 return;
266 };
267
268 if code.is_empty() {
269 return;
270 }
271
272 let execute_request = ExecuteRequest {
273 code: code.to_string(),
274 ..ExecuteRequest::default()
275 };
276
277 let message: JupyterMessage = execute_request.into();
278
279 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
280
281 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
282
283 self.blocks.retain(|_key, block| {
284 if anchor_range.overlaps(&block.code_range, &buffer) {
285 blocks_to_remove.insert(block.block_id);
286 false
287 } else {
288 true
289 }
290 });
291
292 self.editor
293 .update(cx, |editor, cx| {
294 editor.remove_blocks(blocks_to_remove, None, cx);
295 })
296 .ok();
297
298 let status = match &self.kernel {
299 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
300 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
301 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
302 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
303 Kernel::Shutdown => ExecutionStatus::Shutdown,
304 };
305
306 let editor_block = if let Ok(editor_block) =
307 EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
308 {
309 editor_block
310 } else {
311 return;
312 };
313
314 self.blocks
315 .insert(message.header.msg_id.clone(), editor_block);
316
317 match &self.kernel {
318 Kernel::RunningKernel(_) => {
319 self.send(message, cx).ok();
320 }
321 Kernel::StartingKernel(task) => {
322 // Queue up the execution as a task to run after the kernel starts
323 let task = task.clone();
324 let message = message.clone();
325
326 cx.spawn(|this, mut cx| async move {
327 task.await;
328 this.update(&mut cx, |this, cx| {
329 this.send(message, cx).ok();
330 })
331 .ok();
332 })
333 .detach();
334 }
335 _ => {}
336 }
337 }
338
339 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
340 let parent_message_id = match message.parent_header.as_ref() {
341 Some(header) => &header.msg_id,
342 None => return,
343 };
344
345 match &message.content {
346 JupyterMessageContent::Status(status) => {
347 self.kernel.set_execution_state(&status.execution_state);
348 cx.notify();
349 }
350 JupyterMessageContent::KernelInfoReply(reply) => {
351 self.kernel.set_kernel_info(&reply);
352 cx.notify();
353 }
354 _ => {}
355 }
356
357 if let Some(block) = self.blocks.get_mut(parent_message_id) {
358 block.handle_message(&message, cx);
359 return;
360 }
361 }
362
363 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
364 match &mut self.kernel {
365 Kernel::RunningKernel(_kernel) => {
366 self.send(InterruptRequest {}.into(), cx).ok();
367 }
368 Kernel::StartingKernel(_task) => {
369 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
370 }
371 _ => {}
372 }
373 }
374
375 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
376 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
377
378 match kernel {
379 Kernel::RunningKernel(mut kernel) => {
380 let mut request_tx = kernel.request_tx.clone();
381
382 cx.spawn(|this, mut cx| async move {
383 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
384 request_tx.try_send(message).ok();
385
386 // Give the kernel a bit of time to clean up
387 cx.background_executor().timer(Duration::from_secs(3)).await;
388
389 kernel.process.kill().ok();
390
391 this.update(&mut cx, |this, cx| {
392 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
393 this.clear_outputs(cx);
394 this.kernel = Kernel::Shutdown;
395 cx.notify();
396 })
397 .ok();
398 })
399 .detach();
400 }
401 Kernel::StartingKernel(_kernel) => {
402 self.kernel = Kernel::Shutdown;
403 }
404 _ => {
405 self.kernel = Kernel::Shutdown;
406 }
407 }
408 cx.notify();
409 }
410}
411
412pub enum SessionEvent {
413 Shutdown(WeakView<Editor>),
414}
415
416impl EventEmitter<SessionEvent> for Session {}
417
418impl Render for Session {
419 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
420 let mut buttons = vec![];
421
422 buttons.push(
423 ButtonLike::new("shutdown")
424 .child(Label::new("Shutdown"))
425 .style(ButtonStyle::Subtle)
426 .on_click(cx.listener(move |session, _, cx| {
427 session.shutdown(cx);
428 })),
429 );
430
431 let status_text = match &self.kernel {
432 Kernel::RunningKernel(kernel) => {
433 buttons.push(
434 ButtonLike::new("interrupt")
435 .child(Label::new("Interrupt"))
436 .style(ButtonStyle::Subtle)
437 .on_click(cx.listener(move |session, _, cx| {
438 session.interrupt(cx);
439 })),
440 );
441 let mut name = self.kernel_specification.name.clone();
442
443 if let Some(info) = &kernel.kernel_info {
444 name.push_str(" (");
445 name.push_str(&info.language_info.name);
446 name.push_str(")");
447 }
448 name
449 }
450 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
451 Kernel::ErroredLaunch(err) => {
452 format!("{} (Error: {})", self.kernel_specification.name, err)
453 }
454 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
455 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
456 };
457
458 return v_flex()
459 .gap_1()
460 .child(
461 h_flex()
462 .gap_2()
463 .child(self.kernel.dot())
464 .child(Label::new(status_text)),
465 )
466 .child(h_flex().gap_2().children(buttons));
467 }
468}