297 lines
9.7 KiB
Python
297 lines
9.7 KiB
Python
![]() |
#!/usr/bin/env python3
|
||
|
# Adapted from tappy copyright (c) 2016, Matt Layman
|
||
|
# MIT license
|
||
|
# https://github.com/python-tap/tappy
|
||
|
|
||
|
import io
|
||
|
import re
|
||
|
import subprocess
|
||
|
import sys
|
||
|
|
||
|
|
||
|
class Directive(object):
|
||
|
"""A representation of a result line directive."""
|
||
|
|
||
|
skip_pattern = re.compile(
|
||
|
r"""^SKIP\S*
|
||
|
(?P<whitespace>\s*) # Optional whitespace.
|
||
|
(?P<reason>.*) # Slurp up the rest.""",
|
||
|
re.IGNORECASE | re.VERBOSE)
|
||
|
todo_pattern = re.compile(
|
||
|
r"""^TODO\b # The directive name
|
||
|
(?P<whitespace>\s*) # Immediately following must be whitespace.
|
||
|
(?P<reason>.*) # Slurp up the rest.""",
|
||
|
re.IGNORECASE | re.VERBOSE)
|
||
|
|
||
|
def __init__(self, text):
|
||
|
"""Initialize the directive by parsing the text.
|
||
|
The text is assumed to be everything after a '#\s*' on a result line.
|
||
|
"""
|
||
|
self._text = text
|
||
|
self._skip = False
|
||
|
self._todo = False
|
||
|
self._reason = None
|
||
|
|
||
|
match = self.skip_pattern.match(text)
|
||
|
if match:
|
||
|
self._skip = True
|
||
|
self._reason = match.group('reason')
|
||
|
|
||
|
match = self.todo_pattern.match(text)
|
||
|
if match:
|
||
|
if match.group('whitespace'):
|
||
|
self._todo = True
|
||
|
else:
|
||
|
# Catch the case where the directive has no descriptive text.
|
||
|
if match.group('reason') == '':
|
||
|
self._todo = True
|
||
|
self._reason = match.group('reason')
|
||
|
|
||
|
@property
|
||
|
def text(self):
|
||
|
"""Get the entire text."""
|
||
|
return self._text
|
||
|
|
||
|
@property
|
||
|
def skip(self):
|
||
|
"""Check if the directive is a SKIP type."""
|
||
|
return self._skip
|
||
|
|
||
|
@property
|
||
|
def todo(self):
|
||
|
"""Check if the directive is a TODO type."""
|
||
|
return self._todo
|
||
|
|
||
|
@property
|
||
|
def reason(self):
|
||
|
"""Get the reason for the directive."""
|
||
|
return self._reason
|
||
|
|
||
|
|
||
|
class Parser(object):
|
||
|
"""A parser for TAP files and lines."""
|
||
|
|
||
|
# ok and not ok share most of the same characteristics.
|
||
|
result_base = r"""
|
||
|
\s* # Optional whitespace.
|
||
|
(?P<number>\d*) # Optional test number.
|
||
|
\s* # Optional whitespace.
|
||
|
(?P<description>[^#]*) # Optional description before #.
|
||
|
\#? # Optional directive marker.
|
||
|
\s* # Optional whitespace.
|
||
|
(?P<directive>.*) # Optional directive text.
|
||
|
"""
|
||
|
ok = re.compile(r'^ok' + result_base, re.VERBOSE)
|
||
|
not_ok = re.compile(r'^not\ ok' + result_base, re.VERBOSE)
|
||
|
plan = re.compile(r"""
|
||
|
^1..(?P<expected>\d+) # Match the plan details.
|
||
|
[^#]* # Consume any non-hash character to confirm only
|
||
|
# directives appear with the plan details.
|
||
|
\#? # Optional directive marker.
|
||
|
\s* # Optional whitespace.
|
||
|
(?P<directive>.*) # Optional directive text.
|
||
|
""", re.VERBOSE)
|
||
|
diagnostic = re.compile(r'^#')
|
||
|
bail = re.compile(r"""
|
||
|
^Bail\ out!
|
||
|
\s* # Optional whitespace.
|
||
|
(?P<reason>.*) # Optional reason.
|
||
|
""", re.VERBOSE)
|
||
|
version = re.compile(r'^TAP version (?P<version>\d+)$')
|
||
|
|
||
|
TAP_MINIMUM_DECLARED_VERSION = 13
|
||
|
|
||
|
def parse(self, fh):
|
||
|
"""Generate tap.line.Line objects, given a file-like object `fh`.
|
||
|
`fh` may be any object that implements both the iterator and
|
||
|
context management protocol (i.e. it can be used in both a
|
||
|
"with" statement and a "for...in" statement.)
|
||
|
Trailing whitespace and newline characters will be automatically
|
||
|
stripped from the input lines.
|
||
|
"""
|
||
|
with fh:
|
||
|
for line in fh:
|
||
|
yield self.parse_line(line.rstrip())
|
||
|
|
||
|
def parse_line(self, text):
|
||
|
"""Parse a line into whatever TAP category it belongs."""
|
||
|
match = self.ok.match(text)
|
||
|
if match:
|
||
|
return self._parse_result(True, match)
|
||
|
|
||
|
match = self.not_ok.match(text)
|
||
|
if match:
|
||
|
return self._parse_result(False, match)
|
||
|
|
||
|
if self.diagnostic.match(text):
|
||
|
return ('diagnostic', text)
|
||
|
|
||
|
match = self.plan.match(text)
|
||
|
if match:
|
||
|
return self._parse_plan(match)
|
||
|
|
||
|
match = self.bail.match(text)
|
||
|
if match:
|
||
|
return ('bail', match.group('reason'))
|
||
|
|
||
|
match = self.version.match(text)
|
||
|
if match:
|
||
|
return self._parse_version(match)
|
||
|
|
||
|
return ('unknown',)
|
||
|
|
||
|
def _parse_plan(self, match):
|
||
|
"""Parse a matching plan line."""
|
||
|
expected_tests = int(match.group('expected'))
|
||
|
directive = Directive(match.group('directive'))
|
||
|
|
||
|
# Only SKIP directives are allowed in the plan.
|
||
|
if directive.text and not directive.skip:
|
||
|
return ('unknown',)
|
||
|
|
||
|
return ('plan', expected_tests, directive)
|
||
|
|
||
|
def _parse_result(self, ok, match):
|
||
|
"""Parse a matching result line into a result instance."""
|
||
|
return ('result', ok, match.group('number'),
|
||
|
match.group('description').strip(),
|
||
|
Directive(match.group('directive')))
|
||
|
|
||
|
def _parse_version(self, match):
|
||
|
version = int(match.group('version'))
|
||
|
if version < self.TAP_MINIMUM_DECLARED_VERSION:
|
||
|
raise ValueError('It is an error to explicitly specify '
|
||
|
'any version lower than 13.')
|
||
|
return ('version', version)
|
||
|
|
||
|
|
||
|
class Rules(object):
|
||
|
|
||
|
def __init__(self):
|
||
|
self._lines_seen = {'plan': [], 'test': 0, 'failed': 0, 'version': []}
|
||
|
self._errors = []
|
||
|
|
||
|
def check(self, final_line_count):
|
||
|
"""Check the status of all provided data and update the suite."""
|
||
|
if self._lines_seen['version']:
|
||
|
self._process_version_lines()
|
||
|
self._process_plan_lines(final_line_count)
|
||
|
|
||
|
def check_errors(self):
|
||
|
if self._lines_seen['failed'] > 0:
|
||
|
self._add_error('Tests failed.')
|
||
|
if self._errors:
|
||
|
for error in self._errors:
|
||
|
print(error)
|
||
|
return 1
|
||
|
return 0
|
||
|
|
||
|
def _process_version_lines(self):
|
||
|
"""Process version line rules."""
|
||
|
if len(self._lines_seen['version']) > 1:
|
||
|
self._add_error('Multiple version lines appeared.')
|
||
|
elif self._lines_seen['version'][0] != 1:
|
||
|
self._add_error('The version must be on the first line.')
|
||
|
|
||
|
def _process_plan_lines(self, final_line_count):
|
||
|
"""Process plan line rules."""
|
||
|
if not self._lines_seen['plan']:
|
||
|
self._add_error('Missing a plan.')
|
||
|
return
|
||
|
|
||
|
if len(self._lines_seen['plan']) > 1:
|
||
|
self._add_error('Only one plan line is permitted per file.')
|
||
|
return
|
||
|
|
||
|
expected_tests, at_line = self._lines_seen['plan'][0]
|
||
|
if not self._plan_on_valid_line(at_line, final_line_count):
|
||
|
self._add_error(
|
||
|
'A plan must appear at the beginning or end of the file.')
|
||
|
return
|
||
|
|
||
|
if expected_tests != self._lines_seen['test']:
|
||
|
self._add_error(
|
||
|
'Expected {expected_count} tests '
|
||
|
'but only {seen_count} ran.'.format(
|
||
|
expected_count=expected_tests,
|
||
|
seen_count=self._lines_seen['test']))
|
||
|
|
||
|
def _plan_on_valid_line(self, at_line, final_line_count):
|
||
|
"""Check if a plan is on a valid line."""
|
||
|
# Put the common cases first.
|
||
|
if at_line == 1 or at_line == final_line_count:
|
||
|
return True
|
||
|
|
||
|
# The plan may only appear on line 2 if the version is at line 1.
|
||
|
after_version = (
|
||
|
self._lines_seen['version'] and
|
||
|
self._lines_seen['version'][0] == 1 and
|
||
|
at_line == 2)
|
||
|
if after_version:
|
||
|
return True
|
||
|
|
||
|
return False
|
||
|
|
||
|
def handle_bail(self, reason):
|
||
|
"""Handle a bail line."""
|
||
|
self._add_error('Bailed: {reason}').format(reason=reason)
|
||
|
|
||
|
def handle_skipping_plan(self):
|
||
|
"""Handle a plan that contains a SKIP directive."""
|
||
|
sys.exit(77)
|
||
|
|
||
|
def saw_plan(self, expected_tests, at_line):
|
||
|
"""Record when a plan line was seen."""
|
||
|
self._lines_seen['plan'].append((expected_tests, at_line))
|
||
|
|
||
|
def saw_test(self, ok):
|
||
|
"""Record when a test line was seen."""
|
||
|
self._lines_seen['test'] += 1
|
||
|
if not ok:
|
||
|
self._lines_seen['failed'] += 1
|
||
|
|
||
|
def saw_version_at(self, line_counter):
|
||
|
"""Record when a version line was seen."""
|
||
|
self._lines_seen['version'].append(line_counter)
|
||
|
|
||
|
def _add_error(self, message):
|
||
|
self._errors += [message]
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
parser = Parser()
|
||
|
rules = Rules()
|
||
|
|
||
|
try:
|
||
|
out = subprocess.check_output(sys.argv[1:], universal_newlines=True)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
sys.stdout.write(e.output)
|
||
|
raise e
|
||
|
|
||
|
line_generator = parser.parse(io.StringIO(out))
|
||
|
line_counter = 0
|
||
|
for line in line_generator:
|
||
|
line_counter += 1
|
||
|
|
||
|
if line[0] == 'unknown':
|
||
|
continue
|
||
|
|
||
|
if line[0] == 'result':
|
||
|
rules.saw_test(line[1])
|
||
|
print('{okay} {num} {description} {directive}'.format(
|
||
|
okay=('' if line[1] else 'not ') + 'ok', num=line[2],
|
||
|
description=line[3], directive=line[4].text))
|
||
|
elif line[0] == 'plan':
|
||
|
if line[2].skip:
|
||
|
rules.handle_skipping_plan()
|
||
|
rules.saw_plan(line[1], line_counter)
|
||
|
elif line[0] == 'bail':
|
||
|
rules.handle_bail(line[1])
|
||
|
elif line[0] == 'version':
|
||
|
rules.saw_version_at(line_counter)
|
||
|
elif line[0] == 'diagnostic':
|
||
|
print(line[1])
|
||
|
|
||
|
rules.check(line_counter)
|
||
|
sys.exit(rules.check_errors())
|