import re
import signal
import textwrap
from typing import cast
from metakernel import REPLWrapper
from metakernel.pexpect import TIMEOUT
from .exceptions import GnuplotError
CRLF = "\r\n"
NO_BLOCK = ""
ERROR_RE = [
re.compile(
r"^\s*"
r"\^" # Indicates error on above line
r"\s*"
r"\n"
)
]
PROMPT_RE = re.compile(
# most likely "gnuplot> "
r"\w*>\s*$"
)
PROMPT_REMOVE_RE = re.compile(r"\w*>\s*")
# Data block e.g.
# $DATA << EOD
# # x y
# 1 1
# 2 2
# 3 3
# EOD
START_DATABLOCK_RE = re.compile(
# $DATA << EOD
r"^\$\w+\s+<<\s*(?P\w+)$"
)
END_DATABLOCK_RE = re.compile(
# EOD
r"^(?P\w+)$"
)
class GnuplotREPLWrapper(REPLWrapper):
# The prompt after the commands run
prompt = ""
_blocks = {
"data": {"start_re": START_DATABLOCK_RE, "end_re": END_DATABLOCK_RE}
}
_current_block = NO_BLOCK
def exit(self):
"""
Exit the gnuplot process
"""
try:
self._force_prompt(timeout=0.01)
except GnuplotError:
return self.child.kill(signal.SIGKILL)
self.sendline("exit")
def is_error_output(self, text):
"""
Return True if text is recognised as error text
"""
return any(pattern.match(text) for pattern in ERROR_RE)
def validate_input(self, code):
"""
Deal with problematic input
Raises GnuplotError if it cannot deal with it.
"""
if code.endswith("\\"):
raise GnuplotError("Do not execute code that endswith backslash.")
# Do not get stuck in the gnuplot process
code = code.replace("\\\n", " ")
return code
def send(self, cmd):
self.child.send(cmd + "\r")
def _force_prompt(self, timeout: float = 30, n=4):
"""
Force prompt
"""
quick_timeout = 0.05
if timeout < quick_timeout:
quick_timeout = timeout
def quick_prompt():
try:
self._expect_prompt(timeout=quick_timeout)
return True
except TIMEOUT:
return False
def patient_prompt():
try:
self._expect_prompt(timeout=timeout)
return True
except TIMEOUT:
return False
# Eagerly try to get a prompt quickly,
# If that fails wait a while
for _ in range(n):
if quick_prompt():
break
# Probably stuck in help output
if self.child.before:
self.send(self.child.linesep)
else:
# Probably long computation going on
if not patient_prompt():
msg = (
"gnuplot prompt failed to return in in {} seconds"
).format(timeout)
raise GnuplotError(msg)
def _end_of_block(self, stmt, end_string):
"""
Detect the end of block statements
Parameters
----------
stmt : str
Statement to be executed by gnuplot repl
Returns
-------
end_string : str
Terminal string for the current block.
"""
pattern = self._blocks[self._current_block]["end_re"]
if m := pattern.match(stmt):
return m.group("end") == end_string
return False
def _start_of_block(self, stmt):
"""
Detect the start of block statements
Parameters
----------
stmt : str
Statement to be executed by gnuplot repl
Returns
-------
block_type : str
Name of the block that has been detected.
Returns an empty string if none has been detected.
end_string : str
Terminal string for the block that has been detected.
Returns an empty string if none has been detected.
"""
# These are used to detect the end of the block
block_type = NO_BLOCK
end_string = ""
for _type, regexps in self._blocks.items():
if m := regexps["start_re"].match(stmt):
block_type = _type
end_string = m.group("end")
break
return block_type, end_string
def _splitlines(self, code):
"""
Split the code into lines that will be run
"""
# Statements in a block are not followed by a prompt, this
# confuses the repl processing. We detect a block and concatenate
# it into single line so that after executing the line we can
# get a prompt.
lines = []
block_lines = []
end_string = ""
stmts = code.splitlines()
for stmt in stmts:
if self._current_block:
block_lines.append(stmt)
if self._end_of_block(stmt, end_string):
self._current_block = NO_BLOCK
block_lines.append("")
block = "\n".join(block_lines)
lines.append(block)
block_lines = []
end_string = ""
else:
block_name, end_string = self._start_of_block(stmt)
if block_name:
self._current_block = block_name
block_lines.append(stmt)
else:
lines.append(stmt)
if self._current_block:
msg = "Error: {} block not terminated correctly.".format(
self._current_block
)
self._current_block = NO_BLOCK
raise GnuplotError(msg)
return lines
def run_command( # pyright: ignore[reportIncompatibleMethodOverride]
self,
command,
timeout=-1,
stream_handler=None,
line_handler=None,
stdin_handler=None,
):
"""
Run code
This overrides the baseclass method to allow for
input validation and error handling.
"""
command = self.validate_input(command)
# Split up multiline commands and feed them in bit-by-bit
stmts = self._splitlines(command)
output_lines = []
for line in stmts:
self.send(line)
self._force_prompt()
# Removing any crlfs makes subsequent
# processing cleaner
retval = cast("str", self.child.before).replace(CRLF, "\n")
self.prompt = self.child.after
if self.is_error_output(retval):
msg = "{}\n{}".format(line, textwrap.dedent(retval))
raise GnuplotError(msg)
# Sometimes block stmts like datablocks make the
# the prompt leak into the return value
retval = PROMPT_REMOVE_RE.sub("", retval).strip(" ")
# Some gnuplot installations return the input statements
# We do not count those as output
if retval.strip() != line.strip():
output_lines.append(retval)
output = "".join(output_lines)
return output