-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathreplwrap.py
More file actions
254 lines (217 loc) · 6.97 KB
/
replwrap.py
File metadata and controls
254 lines (217 loc) · 6.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import re
import signal
import textwrap
from typing import cast
from metakernel import REPLWrapper
from metakernel.pexpect import TIMEOUT
from .exceptions import GnuplotError
CRLF = "\r\n"
NO_BLOCK = ""
ERROR_RE = [
re.compile(
r"^\s*"
r"\^" # Indicates error on above line
r"\s*"
r"\n"
)
]
PROMPT_RE = re.compile(
# most likely "gnuplot> "
r"\w*>\s*$"
)
PROMPT_REMOVE_RE = re.compile(r"\w*>\s*")
# Data block e.g.
# $DATA << EOD
# # x y
# 1 1
# 2 2
# 3 3
# EOD
START_DATABLOCK_RE = re.compile(
# $DATA << EOD
r"^\$\w+\s+<<\s*(?P<end>\w+)$"
)
END_DATABLOCK_RE = re.compile(
# EOD
r"^(?P<end>\w+)$"
)
class GnuplotREPLWrapper(REPLWrapper):
# The prompt after the commands run
prompt = ""
_blocks = {
"data": {"start_re": START_DATABLOCK_RE, "end_re": END_DATABLOCK_RE}
}
_current_block = NO_BLOCK
def exit(self):
"""
Exit the gnuplot process
"""
try:
self._force_prompt(timeout=0.01)
except GnuplotError:
return self.child.kill(signal.SIGKILL)
self.sendline("exit")
def is_error_output(self, text):
"""
Return True if text is recognised as error text
"""
return any(pattern.match(text) for pattern in ERROR_RE)
def validate_input(self, code):
"""
Deal with problematic input
Raises GnuplotError if it cannot deal with it.
"""
if code.endswith("\\"):
raise GnuplotError("Do not execute code that endswith backslash.")
# Do not get stuck in the gnuplot process
code = code.replace("\\\n", " ")
return code
def send(self, cmd):
self.child.send(cmd + "\r")
def _force_prompt(self, timeout: float = 30, n=4):
"""
Force prompt
"""
quick_timeout = 0.05
if timeout < quick_timeout:
quick_timeout = timeout
def quick_prompt():
try:
self._expect_prompt(timeout=quick_timeout)
return True
except TIMEOUT:
return False
def patient_prompt():
try:
self._expect_prompt(timeout=timeout)
return True
except TIMEOUT:
return False
# Eagerly try to get a prompt quickly,
# If that fails wait a while
for _ in range(n):
if quick_prompt():
break
# Probably stuck in help output
if self.child.before:
self.send(self.child.linesep)
else:
# Probably long computation going on
if not patient_prompt():
msg = (
"gnuplot prompt failed to return in in {} seconds"
).format(timeout)
raise GnuplotError(msg)
def _end_of_block(self, stmt, end_string):
"""
Detect the end of block statements
Parameters
----------
stmt : str
Statement to be executed by gnuplot repl
Returns
-------
end_string : str
Terminal string for the current block.
"""
pattern = self._blocks[self._current_block]["end_re"]
if m := pattern.match(stmt):
return m.group("end") == end_string
return False
def _start_of_block(self, stmt):
"""
Detect the start of block statements
Parameters
----------
stmt : str
Statement to be executed by gnuplot repl
Returns
-------
block_type : str
Name of the block that has been detected.
Returns an empty string if none has been detected.
end_string : str
Terminal string for the block that has been detected.
Returns an empty string if none has been detected.
"""
# These are used to detect the end of the block
block_type = NO_BLOCK
end_string = ""
for _type, regexps in self._blocks.items():
if m := regexps["start_re"].match(stmt):
block_type = _type
end_string = m.group("end")
break
return block_type, end_string
def _splitlines(self, code):
"""
Split the code into lines that will be run
"""
# Statements in a block are not followed by a prompt, this
# confuses the repl processing. We detect a block and concatenate
# it into single line so that after executing the line we can
# get a prompt.
lines = []
block_lines = []
end_string = ""
stmts = code.splitlines()
for stmt in stmts:
if self._current_block:
block_lines.append(stmt)
if self._end_of_block(stmt, end_string):
self._current_block = NO_BLOCK
block_lines.append("")
block = "\n".join(block_lines)
lines.append(block)
block_lines = []
end_string = ""
else:
block_name, end_string = self._start_of_block(stmt)
if block_name:
self._current_block = block_name
block_lines.append(stmt)
else:
lines.append(stmt)
if self._current_block:
msg = "Error: {} block not terminated correctly.".format(
self._current_block
)
self._current_block = NO_BLOCK
raise GnuplotError(msg)
return lines
def run_command( # pyright: ignore[reportIncompatibleMethodOverride]
self,
command,
timeout=-1,
stream_handler=None,
line_handler=None,
stdin_handler=None,
):
"""
Run code
This overrides the baseclass method to allow for
input validation and error handling.
"""
command = self.validate_input(command)
# Split up multiline commands and feed them in bit-by-bit
stmts = self._splitlines(command)
output_lines = []
for line in stmts:
self.send(line)
self._force_prompt()
# Removing any crlfs makes subsequent
# processing cleaner
retval = cast("str", self.child.before).replace(CRLF, "\n")
self.prompt = self.child.after
if self.is_error_output(retval):
msg = "{}\n{}".format(line, textwrap.dedent(retval))
raise GnuplotError(msg)
# Sometimes block stmts like datablocks make the
# the prompt leak into the return value
retval = PROMPT_REMOVE_RE.sub("", retval).strip(" ")
# Some gnuplot installations return the input statements
# We do not count those as output
if retval.strip() != line.strip():
output_lines.append(retval)
output = "".join(output_lines)
return output