1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
9 },
10 Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
11};
12use futures::{FutureExt as _, StreamExt as _};
13use gpui::{
14 div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
15};
16use language::Point;
17use project::Fs;
18use runtimelib::{
19 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, ShutdownRequest,
20};
21use settings::Settings as _;
22use std::{env::temp_dir, ops::Range, path::PathBuf, sync::Arc, time::Duration};
23use theme::{ActiveTheme, ThemeSettings};
24use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
25
26pub struct Session {
27 pub editor: WeakView<Editor>,
28 pub kernel: Kernel,
29 blocks: HashMap<String, EditorBlock>,
30 pub messaging_task: Task<()>,
31 pub kernel_specification: KernelSpecification,
32 _buffer_subscription: Subscription,
33}
34
35struct EditorBlock {
36 editor: WeakView<Editor>,
37 code_range: Range<Anchor>,
38 invalidation_anchor: Anchor,
39 block_id: BlockId,
40 execution_view: View<ExecutionView>,
41}
42
43impl EditorBlock {
44 fn new(
45 editor: WeakView<Editor>,
46 code_range: Range<Anchor>,
47 status: ExecutionStatus,
48 cx: &mut ViewContext<Session>,
49 ) -> anyhow::Result<Self> {
50 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
51
52 let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
53 let buffer = editor.buffer().clone();
54 let buffer_snapshot = buffer.read(cx).snapshot(cx);
55 let end_point = code_range.end.to_point(&buffer_snapshot);
56 let next_row_start = end_point + Point::new(1, 0);
57 if next_row_start > buffer_snapshot.max_point() {
58 buffer.update(cx, |buffer, cx| {
59 buffer.edit(
60 [(
61 buffer_snapshot.max_point()..buffer_snapshot.max_point(),
62 "\n",
63 )],
64 None,
65 cx,
66 )
67 });
68 }
69
70 let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
71 let block = BlockProperties {
72 position: code_range.end,
73 height: execution_view.num_lines(cx).saturating_add(1),
74 style: BlockStyle::Sticky,
75 render: Self::create_output_area_render(execution_view.clone()),
76 disposition: BlockDisposition::Below,
77 };
78
79 let block_id = editor.insert_blocks([block], None, cx)[0];
80 (block_id, invalidation_anchor)
81 })?;
82
83 anyhow::Ok(Self {
84 editor,
85 code_range,
86 invalidation_anchor,
87 block_id,
88 execution_view,
89 })
90 }
91
92 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
93 self.execution_view.update(cx, |execution_view, cx| {
94 execution_view.push_message(&message.content, cx);
95 });
96
97 self.editor
98 .update(cx, |editor, cx| {
99 let mut replacements = HashMap::default();
100 replacements.insert(
101 self.block_id,
102 (
103 Some(self.execution_view.num_lines(cx).saturating_add(1)),
104 Self::create_output_area_render(self.execution_view.clone()),
105 ),
106 );
107 editor.replace_blocks(replacements, None, cx);
108 })
109 .ok();
110 }
111
112 fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
113 let render = move |cx: &mut BlockContext| {
114 let execution_view = execution_view.clone();
115 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
116 let text_font_size = ThemeSettings::get_global(cx).buffer_font_size;
117 // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
118
119 let gutter_width = cx.gutter_dimensions.width;
120
121 h_flex()
122 .w_full()
123 .bg(cx.theme().colors().background)
124 .border_y_1()
125 .border_color(cx.theme().colors().border)
126 .pl(gutter_width)
127 .child(
128 div()
129 .text_size(text_font_size)
130 .font_family(text_font)
131 // .ml(gutter_width)
132 .mx_1()
133 .my_2()
134 .h_full()
135 .w_full()
136 .mr(gutter_width)
137 .child(execution_view),
138 )
139 .into_any_element()
140 };
141
142 Box::new(render)
143 }
144}
145
146impl Session {
147 pub fn working_directory(editor: WeakView<Editor>, cx: &WindowContext) -> PathBuf {
148 if let Some(working_directory) = editor
149 .upgrade()
150 .and_then(|editor| editor.read(cx).working_directory(cx))
151 {
152 working_directory
153 } else {
154 temp_dir()
155 }
156 }
157
158 pub fn new(
159 editor: WeakView<Editor>,
160 fs: Arc<dyn Fs>,
161 kernel_specification: KernelSpecification,
162 cx: &mut ViewContext<Self>,
163 ) -> Self {
164 let entity_id = editor.entity_id();
165
166 let kernel = RunningKernel::new(
167 kernel_specification.clone(),
168 entity_id,
169 Self::working_directory(editor.clone(), cx),
170 fs.clone(),
171 cx,
172 );
173
174 let pending_kernel = cx
175 .spawn(|this, mut cx| async move {
176 let kernel = kernel.await;
177
178 match kernel {
179 Ok((kernel, mut messages_rx)) => {
180 this.update(&mut cx, |this, cx| {
181 // At this point we can create a new kind of kernel that has the process and our long running background tasks
182 this.kernel = Kernel::RunningKernel(kernel);
183
184 this.messaging_task = cx.spawn(|session, mut cx| async move {
185 while let Some(message) = messages_rx.next().await {
186 session
187 .update(&mut cx, |session, cx| {
188 session.route(&message, cx);
189 })
190 .ok();
191 }
192 });
193 })
194 .ok();
195 }
196 Err(err) => {
197 this.update(&mut cx, |this, _cx| {
198 this.kernel = Kernel::ErroredLaunch(err.to_string());
199 })
200 .ok();
201 }
202 }
203 })
204 .shared();
205
206 let subscription = match editor.upgrade() {
207 Some(editor) => {
208 let buffer = editor.read(cx).buffer().clone();
209 cx.subscribe(&buffer, Self::on_buffer_event)
210 }
211 None => Subscription::new(|| {}),
212 };
213
214 return Self {
215 editor,
216 kernel: Kernel::StartingKernel(pending_kernel),
217 messaging_task: Task::ready(()),
218 blocks: HashMap::default(),
219 kernel_specification,
220 _buffer_subscription: subscription,
221 };
222 }
223
224 fn on_buffer_event(
225 &mut self,
226 buffer: Model<MultiBuffer>,
227 event: &multi_buffer::Event,
228 cx: &mut ViewContext<Self>,
229 ) {
230 if let multi_buffer::Event::Edited { .. } = event {
231 let snapshot = buffer.read(cx).snapshot(cx);
232
233 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
234
235 self.blocks.retain(|_id, block| {
236 if block.invalidation_anchor.is_valid(&snapshot) {
237 true
238 } else {
239 blocks_to_remove.insert(block.block_id);
240 false
241 }
242 });
243
244 if !blocks_to_remove.is_empty() {
245 self.editor
246 .update(cx, |editor, cx| {
247 editor.remove_blocks(blocks_to_remove, None, cx);
248 })
249 .ok();
250 cx.notify();
251 }
252 }
253 }
254
255 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
256 match &mut self.kernel {
257 Kernel::RunningKernel(kernel) => {
258 kernel.request_tx.try_send(message).ok();
259 }
260 _ => {}
261 }
262
263 anyhow::Ok(())
264 }
265
266 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
267 let blocks_to_remove: HashSet<BlockId> =
268 self.blocks.values().map(|block| block.block_id).collect();
269
270 self.editor
271 .update(cx, |editor, cx| {
272 editor.remove_blocks(blocks_to_remove, None, cx);
273 })
274 .ok();
275
276 self.blocks.clear();
277 }
278
279 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
280 let editor = if let Some(editor) = self.editor.upgrade() {
281 editor
282 } else {
283 return;
284 };
285
286 if code.is_empty() {
287 return;
288 }
289
290 let execute_request = ExecuteRequest {
291 code: code.to_string(),
292 ..ExecuteRequest::default()
293 };
294
295 let message: JupyterMessage = execute_request.into();
296
297 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
298
299 let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
300
301 self.blocks.retain(|_key, block| {
302 if anchor_range.overlaps(&block.code_range, &buffer) {
303 blocks_to_remove.insert(block.block_id);
304 false
305 } else {
306 true
307 }
308 });
309
310 self.editor
311 .update(cx, |editor, cx| {
312 editor.remove_blocks(blocks_to_remove, None, cx);
313 })
314 .ok();
315
316 let status = match &self.kernel {
317 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
318 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
319 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
320 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
321 Kernel::Shutdown => ExecutionStatus::Shutdown,
322 };
323
324 let editor_block = if let Ok(editor_block) =
325 EditorBlock::new(self.editor.clone(), anchor_range, status, cx)
326 {
327 editor_block
328 } else {
329 return;
330 };
331
332 self.blocks
333 .insert(message.header.msg_id.clone(), editor_block);
334
335 match &self.kernel {
336 Kernel::RunningKernel(_) => {
337 self.send(message, cx).ok();
338 }
339 Kernel::StartingKernel(task) => {
340 // Queue up the execution as a task to run after the kernel starts
341 let task = task.clone();
342 let message = message.clone();
343
344 cx.spawn(|this, mut cx| async move {
345 task.await;
346 this.update(&mut cx, |this, cx| {
347 this.send(message, cx).ok();
348 })
349 .ok();
350 })
351 .detach();
352 }
353 _ => {}
354 }
355 }
356
357 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
358 let parent_message_id = match message.parent_header.as_ref() {
359 Some(header) => &header.msg_id,
360 None => return,
361 };
362
363 match &message.content {
364 JupyterMessageContent::Status(status) => {
365 self.kernel.set_execution_state(&status.execution_state);
366 cx.notify();
367 }
368 JupyterMessageContent::KernelInfoReply(reply) => {
369 self.kernel.set_kernel_info(&reply);
370 cx.notify();
371 }
372 _ => {}
373 }
374
375 if let Some(block) = self.blocks.get_mut(parent_message_id) {
376 block.handle_message(&message, cx);
377 return;
378 }
379 }
380
381 pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
382 match &mut self.kernel {
383 Kernel::RunningKernel(_kernel) => {
384 self.send(InterruptRequest {}.into(), cx).ok();
385 }
386 Kernel::StartingKernel(_task) => {
387 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
388 }
389 _ => {}
390 }
391 }
392
393 pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
394 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
395
396 match kernel {
397 Kernel::RunningKernel(mut kernel) => {
398 let mut request_tx = kernel.request_tx.clone();
399
400 cx.spawn(|this, mut cx| async move {
401 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
402 request_tx.try_send(message).ok();
403
404 // Give the kernel a bit of time to clean up
405 cx.background_executor().timer(Duration::from_secs(3)).await;
406
407 kernel.process.kill().ok();
408
409 this.update(&mut cx, |this, cx| {
410 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
411 this.clear_outputs(cx);
412 this.kernel = Kernel::Shutdown;
413 cx.notify();
414 })
415 .ok();
416 })
417 .detach();
418 }
419 Kernel::StartingKernel(_kernel) => {
420 self.kernel = Kernel::Shutdown;
421 }
422 _ => {
423 self.kernel = Kernel::Shutdown;
424 }
425 }
426 cx.notify();
427 }
428}
429
430pub enum SessionEvent {
431 Shutdown(WeakView<Editor>),
432}
433
434impl EventEmitter<SessionEvent> for Session {}
435
436impl Render for Session {
437 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
438 let mut buttons = vec![];
439
440 buttons.push(
441 ButtonLike::new("shutdown")
442 .child(Label::new("Shutdown"))
443 .style(ButtonStyle::Subtle)
444 .on_click(cx.listener(move |session, _, cx| {
445 session.shutdown(cx);
446 })),
447 );
448
449 let status_text = match &self.kernel {
450 Kernel::RunningKernel(kernel) => {
451 buttons.push(
452 ButtonLike::new("interrupt")
453 .child(Label::new("Interrupt"))
454 .style(ButtonStyle::Subtle)
455 .on_click(cx.listener(move |session, _, cx| {
456 session.interrupt(cx);
457 })),
458 );
459 let mut name = self.kernel_specification.name.clone();
460
461 if let Some(info) = &kernel.kernel_info {
462 name.push_str(" (");
463 name.push_str(&info.language_info.name);
464 name.push_str(")");
465 }
466 name
467 }
468 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
469 Kernel::ErroredLaunch(err) => {
470 format!("{} (Error: {})", self.kernel_specification.name, err)
471 }
472 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
473 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
474 };
475
476 return v_flex()
477 .gap_1()
478 .child(
479 h_flex()
480 .gap_2()
481 .child(self.kernel.dot())
482 .child(Label::new(status_text)),
483 )
484 .child(h_flex().gap_2().children(buttons));
485 }
486}