-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathkernel.py
More file actions
370 lines (311 loc) · 10.9 KB
/
kernel.py
File metadata and controls
370 lines (311 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
from __future__ import annotations
import contextlib
import sys
import uuid
from itertools import chain
from pathlib import Path
from typing import cast
from IPython.display import SVG, Image
from metakernel import MetaKernel, ProcessMetaKernel, pexpect
from metakernel.process_metakernel import TextOutput
from .exceptions import GnuplotError
from .replwrap import PROMPT_RE, PROMPT_REMOVE_RE, GnuplotREPLWrapper
from .statement import STMT
from .utils import get_version
IMG_COUNTER = "__gpk_img_index"
IMG_COUNTER_FMT = "%03d"
class GnuplotKernel(ProcessMetaKernel):
"""
GnuplotKernel
"""
implementation = "Gnuplot Kernel"
implementation_version = get_version("gnuplot_kernel")
language = "gnuplot"
_banner = "Gnuplot Kernel"
language_version = "5.0" # pyright: ignore[reportAssignmentType,reportIncompatibleMethodOverride]
language_info = {
"mimetype": "text/x-gnuplot",
"name": "gnuplot",
"file_extension": ".gp",
"codemirror_mode": "Octave",
"help_links": MetaKernel.help_links,
}
kernel_json = {
"argv": [
sys.executable,
"-m",
"gnuplot_kernel",
"-f",
"{connection_file}",
],
"display_name": "gnuplot",
"language": "gnuplot",
"name": "gnuplot",
}
inline_plotting = True
reset_code = ""
_first = True
_image_files: list[Path] = []
_error = False
wrapper: GnuplotREPLWrapper
_bad_prompts: set = set()
def check_prompt(self):
"""
Print warning if the prompt looks bad
A bad prompt is one that does not contain the string 'gnuplot>'.
The warning is printed once per bad prompt.
"""
prompt = cast("str", self.wrapper.prompt)
if "gnuplot>" not in prompt and prompt not in self._bad_prompts:
print(f"Warning: The prompt is currently set to '{prompt}'")
self._bad_prompts.add(prompt)
def do_execute_direct(self, code, silent=False):
# We wrap the real function so that gnuplot_kernel can
# give a message when an exception occurs. Without
# this, an exception happens silently
try:
return self._do_execute_direct(code)
except Exception as err:
print(f"Error: {err}")
raise err
def _do_execute_direct(self, code: str) -> TextOutput | None:
"""
Execute gnuplot code
"""
if self._first:
self._first = False
self.handle_plot_settings()
if self.inline_plotting:
code = self.add_inline_image_statements(code)
success = True
try:
result = super().do_execute_direct(code, silent=True)
except GnuplotError as e:
result = TextOutput(e.message)
success = False
if self.reset_code:
super().do_execute_direct(self.reset_code, silent=True)
if self.inline_plotting:
if success:
self.display_images()
self.delete_image_files()
self.check_prompt()
# No empty strings
return result if (result and result.output) else None
def add_inline_image_statements(self, code: str) -> str:
"""
Add 'set output ...' before every plotting statement
This is what powers inline plotting
"""
# "set output sprintf('foobar.%d.png', counter);"
# "counter=counter+1"
def set_output_inline(lines):
tpl = self.get_image_filename()
if tpl:
cmd = (
f"set output sprintf('{tpl}', {IMG_COUNTER});"
f"{IMG_COUNTER}={IMG_COUNTER}+1"
)
lines.append(cmd)
# We automatically create an output file for the following
# cases if the user has not created one.
# - before every plot statement that is not in a
# multiplot block
# - before every multiplot block
lines = []
sm = StateMachine()
is_joined_stmt = False
for line in code.splitlines():
stmt = STMT(line)
sm.transition(stmt)
add_inline_plot = (
sm.prev_cur
in (("none", "plot"), ("none", "multiplot"), ("plot", "plot"))
and not is_joined_stmt
)
if add_inline_plot:
set_output_inline(lines)
lines.append(stmt)
is_joined_stmt = stmt.strip().endswith("\\")
# Make gnuplot flush the output
if not lines[-1].endswith("\\"):
lines.append("unset output")
code = "\n".join(lines)
return code
def get_image_filename(self):
"""
Create file to which gnuplot will write the plot
Returns the filename.
"""
# we could use tempfile.NamedTemporaryFile but we do not
# want to create the file, gnuplot will create it.
# Later on when we check if the file exists we know
# whodunnit.
fmt = self.plot_settings["format"]
filename = Path(
f"/tmp/gnuplot-inline-{uuid.uuid1()}.{IMG_COUNTER_FMT}.{fmt}"
)
self._image_files.append(filename)
return filename
def iter_image_files(self):
"""
Iterate over the image files
"""
it = chain(
*[
sorted(f.parent.glob(f.name.replace(IMG_COUNTER_FMT, "*")))
for f in self._image_files
]
)
return it
def display_images(self):
"""
Display images if gnuplot wrote to them
"""
settings = self.plot_settings
if self.inline_plotting:
_Image = SVG if settings["format"] == "svg" else Image
else:
return
for filename in self.iter_image_files():
try:
size = filename.stat().st_size
except FileNotFoundError:
size = 0
if not size:
msg = (
"Failed to read and display image file from gnuplot."
"Possibly:\n"
"1. You have plotted to a non interactive terminal.\n"
"2. You have an invalid expression."
)
print(msg)
continue
im = _Image(str(filename))
self.Display(im)
def delete_image_files(self):
"""
Delete the image files
"""
# After display_images(), the real images are
# no longer required.
for filename in self.iter_image_files():
with contextlib.suppress(FileNotFoundError):
filename.unlink()
self._image_files = []
def makeWrapper(self):
"""
Start gnuplot and return wrapper around the REPL
"""
if pexpect.which("gnuplot"):
program = "gnuplot"
elif pexpect.which("gnuplot.exe"):
program = "gnuplot.exe"
else:
raise Exception("gnuplot not found.")
# We don't want help commands getting stuck,
# use a non interactive PAGER
if pexpect.which("env") and pexpect.which("cat"):
command = "env PAGER=cat {}".format(program)
else:
command = program
wrapper = GnuplotREPLWrapper(
cmd_or_spawn=command,
prompt_regex=PROMPT_RE,
prompt_change_cmd=None,
)
# No sleeping before sending commands to gnuplot
wrapper.child.delaybeforesend = 0
return wrapper
def do_shutdown(self, restart):
"""
Exit the gnuplot process and any other underlying stuff
"""
self.wrapper.exit()
super().do_shutdown(restart)
def get_kernel_help_on(self, info, level=0, none_on_fail=False):
obj = info.get("help_obj", "")
if not obj or len(obj.split()) > 1:
return None if none_on_fail else ""
res = cast("TextOutput", self.do_execute_direct("help %s" % obj))
text = PROMPT_REMOVE_RE.sub("", res.output)
self.check_prompt()
return text
def reset_image_counter(self):
# Incremented after every plot image, and used in the
# plot image filename. Makes plotting in loops do_for
# loops work
cmd = f"{IMG_COUNTER}=0"
self.do_execute_direct(cmd)
def handle_plot_settings(self):
"""
Handle the current plot settings
This is used by the gnuplot line magic. The plot magic
is innadequate.
"""
settings = self.plot_settings
if "termspec" not in settings or not settings["termspec"]:
settings["termspec"] = 'pngcairo size 385, 256 font "Arial,10"'
if "format" not in settings or not settings["format"]:
settings["format"] = "png"
self.inline_plotting = settings["backend"] == "inline"
cmd = "set terminal {}".format(settings["termspec"])
self.do_execute_direct(cmd)
self.reset_image_counter()
class StateMachine:
"""
Track context given gnuplot statements
This is used to help us tell when to inject commands (i.e. set output)
that for inline plotting in the notebook.
"""
states = ["none", "plot", "output", "multiplot", "output_multiplot"]
previous = "none"
_current = "none"
@property
def prev_cur(self):
return (self.previous, self.current)
@property
def current(self):
return self._current
@current.setter
def current(self, value):
self.previous = self._current
self._current = value
def transition(self, stmt):
lookup = {
s: getattr(self, f"transition_from_{s}") for s in self.states
}
_transition = lookup[self.current]
self.previous = self._current
return _transition(stmt)
def transition_from_plot(self, stmt):
if self.current == "output":
self.current = "none"
elif self.current == "plot":
if stmt.is_plot():
self.current = "plot"
elif stmt.is_set_output():
self.current = "output"
else:
self.current = "none"
def transition_from_none(self, stmt):
if stmt.is_plot():
self.current = "plot"
elif stmt.is_set_output():
self.current = "output"
elif stmt.is_set_multiplot():
self.current = "multiplot"
def transition_from_output(self, stmt):
if stmt.is_plot():
self.current = "plot"
elif stmt.is_set_multiplot():
self.current = "output_multiplot"
elif stmt.is_unset_output():
self.current = "none"
def transition_from_multiplot(self, stmt):
if stmt.is_unset_multiplot():
self.current = "none"
def transition_from_output_multiplot(self, stmt):
if stmt.is_unset_multiplot():
self.previous = self.current
self.current = "output"