1use crate::{
2 kernels::{Kernel, KernelSpecification, RunningKernel},
3 outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
4};
5use collections::{HashMap, HashSet};
6use editor::{
7 display_map::{
8 BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, RenderBlock,
9 },
10 Anchor, AnchorRangeExt as _, Editor,
11};
12use futures::{FutureExt as _, StreamExt as _};
13use gpui::{div, prelude::*, Entity, EventEmitter, Render, Task, View, ViewContext};
14use project::Fs;
15use runtimelib::{
16 ExecuteRequest, InterruptRequest, JupyterMessage, JupyterMessageContent, KernelInfoRequest,
17 ShutdownRequest,
18};
19use settings::Settings as _;
20use std::{ops::Range, sync::Arc, time::Duration};
21use theme::{ActiveTheme, ThemeSettings};
22use ui::{h_flex, prelude::*, v_flex, ButtonLike, ButtonStyle, Label};
23
24pub struct Session {
25 editor: View<Editor>,
26 kernel: Kernel,
27 blocks: HashMap<String, EditorBlock>,
28 messaging_task: Task<()>,
29 kernel_specification: KernelSpecification,
30}
31
32#[derive(Debug)]
33struct EditorBlock {
34 editor: View<Editor>,
35 code_range: Range<Anchor>,
36 block_id: BlockId,
37 execution_view: View<ExecutionView>,
38}
39
40impl EditorBlock {
41 fn new(
42 editor: View<Editor>,
43 code_range: Range<Anchor>,
44 status: ExecutionStatus,
45 cx: &mut ViewContext<Session>,
46 ) -> Self {
47 let execution_view = cx.new_view(|cx| ExecutionView::new(status, cx));
48
49 let block_id = editor.update(cx, |editor, cx| {
50 let block = BlockProperties {
51 position: code_range.end,
52 height: execution_view.num_lines(cx).saturating_add(1),
53 style: BlockStyle::Sticky,
54 render: Self::create_output_area_render(execution_view.clone()),
55 disposition: BlockDisposition::Below,
56 };
57
58 editor.insert_blocks([block], None, cx)[0]
59 });
60
61 Self {
62 editor,
63 code_range,
64 block_id,
65 execution_view,
66 }
67 }
68
69 fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
70 self.execution_view.update(cx, |execution_view, cx| {
71 execution_view.push_message(&message.content, cx);
72 });
73
74 self.editor.update(cx, |editor, cx| {
75 let mut replacements = HashMap::default();
76 replacements.insert(
77 self.block_id,
78 (
79 Some(self.execution_view.num_lines(cx).saturating_add(1)),
80 Self::create_output_area_render(self.execution_view.clone()),
81 ),
82 );
83 editor.replace_blocks(replacements, None, cx);
84 })
85 }
86
87 fn create_output_area_render(execution_view: View<ExecutionView>) -> RenderBlock {
88 let render = move |cx: &mut BlockContext| {
89 let execution_view = execution_view.clone();
90 let text_font = ThemeSettings::get_global(cx).buffer_font.family.clone();
91 // Note: we'll want to use `cx.anchor_x` when someone runs something with no output -- just show a checkmark and not make the full block below the line
92
93 let gutter_width = cx.gutter_dimensions.width;
94
95 h_flex()
96 .w_full()
97 .bg(cx.theme().colors().background)
98 .border_y_1()
99 .border_color(cx.theme().colors().border)
100 .pl(gutter_width)
101 .child(
102 div()
103 .font_family(text_font)
104 // .ml(gutter_width)
105 .mx_1()
106 .my_2()
107 .h_full()
108 .w_full()
109 .mr(gutter_width)
110 .child(execution_view),
111 )
112 .into_any_element()
113 };
114
115 Box::new(render)
116 }
117}
118
119impl Session {
120 pub fn new(
121 editor: View<Editor>,
122 fs: Arc<dyn Fs>,
123 kernel_specification: KernelSpecification,
124 cx: &mut ViewContext<Self>,
125 ) -> Self {
126 let entity_id = editor.entity_id();
127 let kernel = RunningKernel::new(kernel_specification.clone(), entity_id, fs.clone(), cx);
128
129 let pending_kernel = cx
130 .spawn(|this, mut cx| async move {
131 let kernel = kernel.await;
132
133 match kernel {
134 Ok((kernel, mut messages_rx)) => {
135 this.update(&mut cx, |this, cx| {
136 // At this point we can create a new kind of kernel that has the process and our long running background tasks
137 this.kernel = Kernel::RunningKernel(kernel);
138
139 this.messaging_task = cx.spawn(|session, mut cx| async move {
140 while let Some(message) = messages_rx.next().await {
141 session
142 .update(&mut cx, |session, cx| {
143 session.route(&message, cx);
144 })
145 .ok();
146 }
147 });
148
149 // For some reason sending a kernel info request will brick the ark (R) kernel.
150 // Note that Deno and Python do not have this issue.
151 if this.kernel_specification.name == "ark" {
152 return;
153 }
154
155 // Get kernel info after (possibly) letting the kernel start
156 cx.spawn(|this, mut cx| async move {
157 cx.background_executor()
158 .timer(Duration::from_millis(120))
159 .await;
160 this.update(&mut cx, |this, _cx| {
161 this.send(KernelInfoRequest {}.into(), _cx).ok();
162 })
163 .ok();
164 })
165 .detach();
166 })
167 .ok();
168 }
169 Err(err) => {
170 this.update(&mut cx, |this, _cx| {
171 this.kernel = Kernel::ErroredLaunch(err.to_string());
172 })
173 .ok();
174 }
175 }
176 })
177 .shared();
178
179 return Self {
180 editor,
181 kernel: Kernel::StartingKernel(pending_kernel),
182 messaging_task: Task::ready(()),
183 blocks: HashMap::default(),
184 kernel_specification,
185 };
186 }
187
188 fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
189 match &mut self.kernel {
190 Kernel::RunningKernel(kernel) => {
191 kernel.request_tx.try_send(message).ok();
192 }
193 _ => {}
194 }
195
196 anyhow::Ok(())
197 }
198
199 pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
200 let blocks_to_remove: HashSet<BlockId> =
201 self.blocks.values().map(|block| block.block_id).collect();
202
203 self.editor.update(cx, |editor, cx| {
204 editor.remove_blocks(blocks_to_remove, None, cx);
205 });
206
207 self.blocks.clear();
208 }
209
210 pub fn execute(&mut self, code: &str, anchor_range: Range<Anchor>, cx: &mut ViewContext<Self>) {
211 let execute_request = ExecuteRequest {
212 code: code.to_string(),
213 ..ExecuteRequest::default()
214 };
215
216 let message: JupyterMessage = execute_request.into();
217
218 let mut blocks_to_remove: HashSet<BlockId> = HashSet::default();
219
220 let buffer = self.editor.read(cx).buffer().read(cx).snapshot(cx);
221
222 self.blocks.retain(|_key, block| {
223 if anchor_range.overlaps(&block.code_range, &buffer) {
224 blocks_to_remove.insert(block.block_id);
225 false
226 } else {
227 true
228 }
229 });
230
231 self.editor.update(cx, |editor, cx| {
232 editor.remove_blocks(blocks_to_remove, None, cx);
233 });
234
235 let status = match &self.kernel {
236 Kernel::RunningKernel(_) => ExecutionStatus::Queued,
237 Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
238 Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
239 Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
240 Kernel::Shutdown => ExecutionStatus::Shutdown,
241 };
242
243 let editor_block = EditorBlock::new(self.editor.clone(), anchor_range, status, cx);
244
245 self.blocks
246 .insert(message.header.msg_id.clone(), editor_block);
247
248 match &self.kernel {
249 Kernel::RunningKernel(_) => {
250 self.send(message, cx).ok();
251 }
252 Kernel::StartingKernel(task) => {
253 // Queue up the execution as a task to run after the kernel starts
254 let task = task.clone();
255 let message = message.clone();
256
257 cx.spawn(|this, mut cx| async move {
258 task.await;
259 this.update(&mut cx, |this, cx| {
260 this.send(message, cx).ok();
261 })
262 .ok();
263 })
264 .detach();
265 }
266 _ => {}
267 }
268 }
269
270 fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
271 let parent_message_id = match message.parent_header.as_ref() {
272 Some(header) => &header.msg_id,
273 None => return,
274 };
275
276 match &message.content {
277 JupyterMessageContent::Status(status) => {
278 self.kernel.set_execution_state(&status.execution_state);
279 cx.notify();
280 }
281 JupyterMessageContent::KernelInfoReply(reply) => {
282 self.kernel.set_kernel_info(&reply);
283 cx.notify();
284 }
285 _ => {}
286 }
287
288 if let Some(block) = self.blocks.get_mut(parent_message_id) {
289 block.handle_message(&message, cx);
290 return;
291 }
292 }
293
294 fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
295 match &mut self.kernel {
296 Kernel::RunningKernel(_kernel) => {
297 self.send(InterruptRequest {}.into(), cx).ok();
298 }
299 Kernel::StartingKernel(_task) => {
300 // NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
301 }
302 _ => {}
303 }
304 }
305
306 fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
307 let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
308 // todo!(): emit event for the runtime panel to remove this session once in shutdown state
309
310 match kernel {
311 Kernel::RunningKernel(mut kernel) => {
312 let mut request_tx = kernel.request_tx.clone();
313
314 cx.spawn(|this, mut cx| async move {
315 let message: JupyterMessage = ShutdownRequest { restart: false }.into();
316 request_tx.try_send(message).ok();
317
318 // Give the kernel a bit of time to clean up
319 cx.background_executor().timer(Duration::from_secs(3)).await;
320
321 kernel.process.kill().ok();
322
323 this.update(&mut cx, |this, cx| {
324 cx.emit(SessionEvent::Shutdown(this.editor.clone()));
325 this.clear_outputs(cx);
326 this.kernel = Kernel::Shutdown;
327 cx.notify();
328 })
329 .ok();
330 })
331 .detach();
332 }
333 Kernel::StartingKernel(_kernel) => {
334 self.kernel = Kernel::Shutdown;
335 }
336 _ => {
337 self.kernel = Kernel::Shutdown;
338 }
339 }
340 cx.notify();
341 }
342}
343
344pub enum SessionEvent {
345 Shutdown(View<Editor>),
346}
347
348impl EventEmitter<SessionEvent> for Session {}
349
350impl Render for Session {
351 fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
352 let mut buttons = vec![];
353
354 buttons.push(
355 ButtonLike::new("shutdown")
356 .child(Label::new("Shutdown"))
357 .style(ButtonStyle::Subtle)
358 .on_click(cx.listener(move |session, _, cx| {
359 session.shutdown(cx);
360 })),
361 );
362
363 let status_text = match &self.kernel {
364 Kernel::RunningKernel(kernel) => {
365 buttons.push(
366 ButtonLike::new("interrupt")
367 .child(Label::new("Interrupt"))
368 .style(ButtonStyle::Subtle)
369 .on_click(cx.listener(move |session, _, cx| {
370 session.interrupt(cx);
371 })),
372 );
373 let mut name = self.kernel_specification.name.clone();
374
375 if let Some(info) = &kernel.kernel_info {
376 name.push_str(" (");
377 name.push_str(&info.language_info.name);
378 name.push_str(")");
379 }
380 name
381 }
382 Kernel::StartingKernel(_) => format!("{} (Starting)", self.kernel_specification.name),
383 Kernel::ErroredLaunch(err) => {
384 format!("{} (Error: {})", self.kernel_specification.name, err)
385 }
386 Kernel::ShuttingDown => format!("{} (Shutting Down)", self.kernel_specification.name),
387 Kernel::Shutdown => format!("{} (Shutdown)", self.kernel_specification.name),
388 };
389
390 return v_flex()
391 .gap_1()
392 .child(
393 h_flex()
394 .gap_2()
395 .child(self.kernel.dot())
396 .child(Label::new(status_text)),
397 )
398 .child(h_flex().gap_2().children(buttons));
399 }
400}