Separate code and data of outcome analysis

Place the code of outcome analysis (auxiliary functions, tasks, command line
entry point) into a separate module, which will be moved to the
version-independent framework repository so that it can be shared between
maintained branches. Keep the branch-specific list of driver components and
ignore lists in the per-repository script.

We keep the executable script at `tests/scripts/analyze_outcomes.py`. It's
simpler that way, because that path is hard-coded in CI scripts.

Signed-off-by: Gilles Peskine <Gilles.Peskine@arm.com>
This commit is contained in:
Gilles Peskine 2024-10-03 18:42:37 +02:00
parent e41cde57c3
commit 082eadef4e
2 changed files with 368 additions and 360 deletions

View File

@ -6,279 +6,13 @@ This script can also run on outcomes from a partial run, but the results are
less likely to be useful. less likely to be useful.
""" """
import argparse
import sys
import traceback
import re import re
import subprocess
import os
import typing
import collect_test_cases import outcome_analysis
# `ComponentOutcomes` is a named tuple which is defined as: class CoverageTask(outcome_analysis.CoverageTask):
# ComponentOutcomes( pass # We'll populate IGNORED_TESTS soon
# successes = {
# "<suite_case>",
# ...
# },
# failures = {
# "<suite_case>",
# ...
# }
# )
# suite_case = "<suite>;<case>"
ComponentOutcomes = typing.NamedTuple('ComponentOutcomes',
[('successes', typing.Set[str]),
('failures', typing.Set[str])])
# `Outcomes` is a representation of the outcomes file,
# which defined as:
# Outcomes = {
# "<component>": ComponentOutcomes,
# ...
# }
Outcomes = typing.Dict[str, ComponentOutcomes]
class Results:
"""Process analysis results."""
def __init__(self):
self.error_count = 0
self.warning_count = 0
def new_section(self, fmt, *args, **kwargs):
self._print_line('\n*** ' + fmt + ' ***\n', *args, **kwargs)
def info(self, fmt, *args, **kwargs):
self._print_line('Info: ' + fmt, *args, **kwargs)
def error(self, fmt, *args, **kwargs):
self.error_count += 1
self._print_line('Error: ' + fmt, *args, **kwargs)
def warning(self, fmt, *args, **kwargs):
self.warning_count += 1
self._print_line('Warning: ' + fmt, *args, **kwargs)
@staticmethod
def _print_line(fmt, *args, **kwargs):
sys.stderr.write((fmt + '\n').format(*args, **kwargs))
def execute_reference_driver_tests(results: Results, ref_component: str, driver_component: str, \
outcome_file: str) -> None:
"""Run the tests specified in ref_component and driver_component. Results
are stored in the output_file and they will be used for the following
coverage analysis"""
results.new_section("Test {} and {}", ref_component, driver_component)
shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \
" " + ref_component + " " + driver_component
results.info("Running: {}", shell_command)
ret_val = subprocess.run(shell_command.split(), check=False).returncode
if ret_val != 0:
results.error("failed to run reference/driver components")
IgnoreEntry = typing.Union[str, typing.Pattern]
def name_matches_pattern(name: str, str_or_re: IgnoreEntry) -> bool:
"""Check if name matches a pattern, that may be a string or regex.
- If the pattern is a string, name must be equal to match.
- If the pattern is a regex, name must fully match.
"""
# The CI's python is too old for re.Pattern
#if isinstance(str_or_re, re.Pattern):
if not isinstance(str_or_re, str):
return str_or_re.fullmatch(name) is not None
else:
return str_or_re == name
def read_outcome_file(outcome_file: str) -> Outcomes:
"""Parse an outcome file and return an outcome collection.
"""
outcomes = {}
with open(outcome_file, 'r', encoding='utf-8') as input_file:
for line in input_file:
(_platform, component, suite, case, result, _cause) = line.split(';')
# Note that `component` is not unique. If a test case passes on Linux
# and fails on FreeBSD, it'll end up in both the successes set and
# the failures set.
suite_case = ';'.join([suite, case])
if component not in outcomes:
outcomes[component] = ComponentOutcomes(set(), set())
if result == 'PASS':
outcomes[component].successes.add(suite_case)
elif result == 'FAIL':
outcomes[component].failures.add(suite_case)
return outcomes
class Task:
"""Base class for outcome analysis tasks."""
# Override the following in child classes.
# Map test suite names (with the test_suite_prefix) to a list of ignored
# test cases. Each element in the list can be either a string or a regex;
# see the `name_matches_pattern` function.
IGNORED_TESTS = {} #type: typing.Dict[str, typing.List[IgnoreEntry]]
def __init__(self, options) -> None:
"""Pass command line options to the tasks.
Each task decides which command line options it cares about.
"""
pass
def section_name(self) -> str:
"""The section name to use in results."""
raise NotImplementedError
def ignored_tests(self, test_suite: str) -> typing.Iterator[IgnoreEntry]:
"""Generate the ignore list for the specified test suite."""
if test_suite in self.IGNORED_TESTS:
yield from self.IGNORED_TESTS[test_suite]
pos = test_suite.find('.')
if pos != -1:
base_test_suite = test_suite[:pos]
if base_test_suite in self.IGNORED_TESTS:
yield from self.IGNORED_TESTS[base_test_suite]
def is_test_case_ignored(self, test_suite: str, test_string: str) -> bool:
"""Check if the specified test case is ignored."""
for str_or_re in self.ignored_tests(test_suite):
if name_matches_pattern(test_string, str_or_re):
return True
return False
def run(self, results: Results, outcomes: Outcomes):
"""Run the analysis on the specified outcomes.
Signal errors via the results objects
"""
raise NotImplementedError
class CoverageTask(Task):
"""Analyze test coverage."""
# Test cases whose suite and description are matched by an entry in
# IGNORED_TESTS are expected to be never executed.
# All other test cases are expected to be executed at least once.
def __init__(self, options) -> None:
super().__init__(options)
self.full_coverage = options.full_coverage #type: bool
@staticmethod
def section_name() -> str:
return "Analyze coverage"
def run(self, results: Results, outcomes: Outcomes) -> None:
"""Check that all available test cases are executed at least once."""
# Make sure that the generated data files are present (and up-to-date).
# This allows analyze_outcomes.py to run correctly on a fresh Git
# checkout.
cp = subprocess.run(['make', 'generated_files'],
cwd='tests',
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
check=False)
if cp.returncode != 0:
sys.stderr.write(cp.stdout.decode('utf-8'))
results.error("Failed \"make generated_files\" in tests. "
"Coverage analysis may be incorrect.")
available = collect_test_cases.collect_available_test_cases()
for suite_case in available:
hit = any(suite_case in comp_outcomes.successes or
suite_case in comp_outcomes.failures
for comp_outcomes in outcomes.values())
(test_suite, test_description) = suite_case.split(';')
ignored = self.is_test_case_ignored(test_suite, test_description)
if not hit and not ignored:
if self.full_coverage:
results.error('Test case not executed: {}', suite_case)
else:
results.warning('Test case not executed: {}', suite_case)
elif hit and ignored:
# If a test case is no longer always skipped, we should remove
# it from the ignore list.
if self.full_coverage:
results.error('Test case was executed but marked as ignored for coverage: {}',
suite_case)
else:
results.warning('Test case was executed but marked as ignored for coverage: {}',
suite_case)
class DriverVSReference(Task):
"""Compare outcomes from testing with and without a driver.
There are 2 options to use analyze_driver_vs_reference_xxx locally:
1. Run tests and then analysis:
- tests/scripts/all.sh --outcome-file "$PWD/out.csv" <component_ref> <component_driver>
- tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx
2. Let this script run both automatically:
- tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx
"""
# Override the following in child classes.
# Configuration name (all.sh component) used as the reference.
REFERENCE = ''
# Configuration name (all.sh component) used as the driver.
DRIVER = ''
# Ignored test suites (without the test_suite_ prefix).
IGNORED_SUITES = [] #type: typing.List[str]
def __init__(self, options) -> None:
super().__init__(options)
self.ignored_suites = frozenset('test_suite_' + x
for x in self.IGNORED_SUITES)
def section_name(self) -> str:
return f"Analyze driver {self.DRIVER} vs reference {self.REFERENCE}"
def run(self, results: Results, outcomes: Outcomes) -> None:
"""Check that all tests passing in the driver component are also
passing in the corresponding reference component.
Skip:
- full test suites provided in ignored_suites list
- only some specific test inside a test suite, for which the corresponding
output string is provided
"""
ref_outcomes = outcomes.get("component_" + self.REFERENCE)
driver_outcomes = outcomes.get("component_" + self.DRIVER)
if ref_outcomes is None or driver_outcomes is None:
results.error("required components are missing: bad outcome file?")
return
if not ref_outcomes.successes:
results.error("no passing test in reference component: bad outcome file?")
return
for suite_case in ref_outcomes.successes:
# suite_case is like "test_suite_foo.bar;Description of test case"
(full_test_suite, test_string) = suite_case.split(';')
test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name
# Immediately skip fully-ignored test suites
if test_suite in self.ignored_suites or \
full_test_suite in self.ignored_suites:
continue
# For ignored test cases inside test suites, just remember and:
# don't issue an error if they're skipped with drivers,
# but issue an error if they're not (means we have a bad entry).
ignored = self.is_test_case_ignored(full_test_suite, test_string)
if not ignored and not suite_case in driver_outcomes.successes:
results.error("SKIP/FAIL -> PASS: {}", suite_case)
if ignored and suite_case in driver_outcomes.successes:
results.error("uselessly ignored: {}", suite_case)
# The names that we give to classes derived from DriverVSReference do not # The names that we give to classes derived from DriverVSReference do not
@ -288,7 +22,7 @@ class DriverVSReference(Task):
# documentation. # documentation.
#pylint: disable=invalid-name,missing-class-docstring #pylint: disable=invalid-name,missing-class-docstring
class DriverVSReference_hash(DriverVSReference): class DriverVSReference_hash(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_hash_use_psa' REFERENCE = 'test_psa_crypto_config_reference_hash_use_psa'
DRIVER = 'test_psa_crypto_config_accel_hash_use_psa' DRIVER = 'test_psa_crypto_config_accel_hash_use_psa'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -308,7 +42,7 @@ class DriverVSReference_hash(DriverVSReference):
], ],
} }
class DriverVSReference_hmac(DriverVSReference): class DriverVSReference_hmac(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_hmac' REFERENCE = 'test_psa_crypto_config_reference_hmac'
DRIVER = 'test_psa_crypto_config_accel_hmac' DRIVER = 'test_psa_crypto_config_accel_hmac'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -347,7 +81,7 @@ class DriverVSReference_hmac(DriverVSReference):
], ],
} }
class DriverVSReference_cipher_aead_cmac(DriverVSReference): class DriverVSReference_cipher_aead_cmac(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_cipher_aead_cmac' REFERENCE = 'test_psa_crypto_config_reference_cipher_aead_cmac'
DRIVER = 'test_psa_crypto_config_accel_cipher_aead_cmac' DRIVER = 'test_psa_crypto_config_accel_cipher_aead_cmac'
# Modules replaced by drivers. # Modules replaced by drivers.
@ -414,7 +148,7 @@ class DriverVSReference_cipher_aead_cmac(DriverVSReference):
], ],
} }
class DriverVSReference_ecp_light_only(DriverVSReference): class DriverVSReference_ecp_light_only(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_ecc_ecp_light_only' REFERENCE = 'test_psa_crypto_config_reference_ecc_ecp_light_only'
DRIVER = 'test_psa_crypto_config_accel_ecc_ecp_light_only' DRIVER = 'test_psa_crypto_config_accel_ecc_ecp_light_only'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -454,7 +188,7 @@ class DriverVSReference_ecp_light_only(DriverVSReference):
], ],
} }
class DriverVSReference_no_ecp_at_all(DriverVSReference): class DriverVSReference_no_ecp_at_all(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_ecc_no_ecp_at_all' REFERENCE = 'test_psa_crypto_config_reference_ecc_no_ecp_at_all'
DRIVER = 'test_psa_crypto_config_accel_ecc_no_ecp_at_all' DRIVER = 'test_psa_crypto_config_accel_ecc_no_ecp_at_all'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -492,7 +226,7 @@ class DriverVSReference_no_ecp_at_all(DriverVSReference):
], ],
} }
class DriverVSReference_ecc_no_bignum(DriverVSReference): class DriverVSReference_ecc_no_bignum(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_ecc_no_bignum' REFERENCE = 'test_psa_crypto_config_reference_ecc_no_bignum'
DRIVER = 'test_psa_crypto_config_accel_ecc_no_bignum' DRIVER = 'test_psa_crypto_config_accel_ecc_no_bignum'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -537,7 +271,7 @@ class DriverVSReference_ecc_no_bignum(DriverVSReference):
], ],
} }
class DriverVSReference_ecc_ffdh_no_bignum(DriverVSReference): class DriverVSReference_ecc_ffdh_no_bignum(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_ecc_ffdh_no_bignum' REFERENCE = 'test_psa_crypto_config_reference_ecc_ffdh_no_bignum'
DRIVER = 'test_psa_crypto_config_accel_ecc_ffdh_no_bignum' DRIVER = 'test_psa_crypto_config_accel_ecc_ffdh_no_bignum'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -590,7 +324,7 @@ class DriverVSReference_ecc_ffdh_no_bignum(DriverVSReference):
], ],
} }
class DriverVSReference_ffdh_alg(DriverVSReference): class DriverVSReference_ffdh_alg(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_ffdh' REFERENCE = 'test_psa_crypto_config_reference_ffdh'
DRIVER = 'test_psa_crypto_config_accel_ffdh' DRIVER = 'test_psa_crypto_config_accel_ffdh'
IGNORED_SUITES = ['dhm'] IGNORED_SUITES = ['dhm']
@ -606,7 +340,7 @@ class DriverVSReference_ffdh_alg(DriverVSReference):
], ],
} }
class DriverVSReference_tfm_config(DriverVSReference): class DriverVSReference_tfm_config(outcome_analysis.DriverVSReference):
REFERENCE = 'test_tfm_config_no_p256m' REFERENCE = 'test_tfm_config_no_p256m'
DRIVER = 'test_tfm_config_p256m_driver_accel_ec' DRIVER = 'test_tfm_config_p256m_driver_accel_ec'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -638,7 +372,7 @@ class DriverVSReference_tfm_config(DriverVSReference):
], ],
} }
class DriverVSReference_rsa(DriverVSReference): class DriverVSReference_rsa(outcome_analysis.DriverVSReference):
REFERENCE = 'test_psa_crypto_config_reference_rsa_crypto' REFERENCE = 'test_psa_crypto_config_reference_rsa_crypto'
DRIVER = 'test_psa_crypto_config_accel_rsa_crypto' DRIVER = 'test_psa_crypto_config_accel_rsa_crypto'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -677,7 +411,7 @@ class DriverVSReference_rsa(DriverVSReference):
], ],
} }
class DriverVSReference_block_cipher_dispatch(DriverVSReference): class DriverVSReference_block_cipher_dispatch(outcome_analysis.DriverVSReference):
REFERENCE = 'test_full_block_cipher_legacy_dispatch' REFERENCE = 'test_full_block_cipher_legacy_dispatch'
DRIVER = 'test_full_block_cipher_psa_dispatch' DRIVER = 'test_full_block_cipher_psa_dispatch'
IGNORED_SUITES = [ IGNORED_SUITES = [
@ -744,7 +478,6 @@ class DriverVSReference_block_cipher_dispatch(DriverVSReference):
#pylint: enable=invalid-name,missing-class-docstring #pylint: enable=invalid-name,missing-class-docstring
# List of tasks with a function that can handle this task and additional arguments if required # List of tasks with a function that can handle this task and additional arguments if required
KNOWN_TASKS = { KNOWN_TASKS = {
'analyze_coverage': CoverageTask, 'analyze_coverage': CoverageTask,
@ -761,83 +494,5 @@ KNOWN_TASKS = {
'analyze_block_cipher_dispatch': DriverVSReference_block_cipher_dispatch, 'analyze_block_cipher_dispatch': DriverVSReference_block_cipher_dispatch,
} }
def main(known_tasks: typing.Dict[str, typing.Type[Task]]) -> None:
main_results = Results()
try:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('outcomes', metavar='OUTCOMES.CSV',
help='Outcome file to analyze')
parser.add_argument('specified_tasks', default='all', nargs='?',
help='Analysis to be done. By default, run all tasks. '
'With one or more TASK, run only those. '
'TASK can be the name of a single task or '
'comma/space-separated list of tasks. ')
parser.add_argument('--list', action='store_true',
help='List all available tasks and exit.')
parser.add_argument('--require-full-coverage', action='store_true',
dest='full_coverage', help="Require all available "
"test cases to be executed and issue an error "
"otherwise. This flag is ignored if 'task' is "
"neither 'all' nor 'analyze_coverage'")
options = parser.parse_args()
if options.list:
for task_name in known_tasks:
print(task_name)
sys.exit(0)
if options.specified_tasks == 'all':
tasks_list = list(known_tasks.keys())
else:
tasks_list = re.split(r'[, ]+', options.specified_tasks)
for task_name in tasks_list:
if task_name not in known_tasks:
sys.stderr.write('invalid task: {}\n'.format(task_name))
sys.exit(2)
# If the outcome file exists, parse it once and share the result
# among tasks to improve performance.
# Otherwise, it will be generated by execute_reference_driver_tests.
if not os.path.exists(options.outcomes):
if len(tasks_list) > 1:
sys.stderr.write("mutiple tasks found, please provide a valid outcomes file.\n")
sys.exit(2)
task_name = tasks_list[0]
task_class = known_tasks[task_name]
if not issubclass(task_class, DriverVSReference):
sys.stderr.write("please provide valid outcomes file for {}.\n".format(task_name))
sys.exit(2)
# mypy isn't smart enough to know that REFERENCE and DRIVER
# are *class* attributes of all classes derived from
# DriverVSReference. (It would be smart enough if we had an
# instance of task_class, but we can't construct an instance
# until we have the outcome data, so at this point we only
# have the class.) So we use indirection to access the class
# attributes.
execute_reference_driver_tests(main_results,
getattr(task_class, 'REFERENCE'),
getattr(task_class, 'DRIVER'),
options.outcomes)
outcomes = read_outcome_file(options.outcomes)
for task_name in tasks_list:
task_constructor = known_tasks[task_name]
task_instance = task_constructor(options)
main_results.new_section(task_instance.section_name())
task_instance.run(main_results, outcomes)
main_results.info("Overall results: {} warnings and {} errors",
main_results.warning_count, main_results.error_count)
sys.exit(0 if (main_results.error_count == 0) else 1)
except Exception: # pylint: disable=broad-except
# Print the backtrace and exit explicitly with our chosen status.
traceback.print_exc()
sys.exit(120)
if __name__ == '__main__': if __name__ == '__main__':
main(KNOWN_TASKS) outcome_analysis.main(KNOWN_TASKS)

View File

@ -7,3 +7,356 @@ the classes with branch-specific customizations such as ignore lists.
# Copyright The Mbed TLS Contributors # Copyright The Mbed TLS Contributors
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
import argparse
import sys
import traceback
import re
import subprocess
import os
import typing
import collect_test_cases
# `ComponentOutcomes` is a named tuple which is defined as:
# ComponentOutcomes(
# successes = {
# "<suite_case>",
# ...
# },
# failures = {
# "<suite_case>",
# ...
# }
# )
# suite_case = "<suite>;<case>"
ComponentOutcomes = typing.NamedTuple('ComponentOutcomes',
[('successes', typing.Set[str]),
('failures', typing.Set[str])])
# `Outcomes` is a representation of the outcomes file,
# which defined as:
# Outcomes = {
# "<component>": ComponentOutcomes,
# ...
# }
Outcomes = typing.Dict[str, ComponentOutcomes]
class Results:
"""Process analysis results."""
def __init__(self):
self.error_count = 0
self.warning_count = 0
def new_section(self, fmt, *args, **kwargs):
self._print_line('\n*** ' + fmt + ' ***\n', *args, **kwargs)
def info(self, fmt, *args, **kwargs):
self._print_line('Info: ' + fmt, *args, **kwargs)
def error(self, fmt, *args, **kwargs):
self.error_count += 1
self._print_line('Error: ' + fmt, *args, **kwargs)
def warning(self, fmt, *args, **kwargs):
self.warning_count += 1
self._print_line('Warning: ' + fmt, *args, **kwargs)
@staticmethod
def _print_line(fmt, *args, **kwargs):
sys.stderr.write((fmt + '\n').format(*args, **kwargs))
def execute_reference_driver_tests(results: Results, ref_component: str, driver_component: str, \
outcome_file: str) -> None:
"""Run the tests specified in ref_component and driver_component. Results
are stored in the output_file and they will be used for the following
coverage analysis"""
results.new_section("Test {} and {}", ref_component, driver_component)
shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \
" " + ref_component + " " + driver_component
results.info("Running: {}", shell_command)
ret_val = subprocess.run(shell_command.split(), check=False).returncode
if ret_val != 0:
results.error("failed to run reference/driver components")
IgnoreEntry = typing.Union[str, typing.Pattern]
def name_matches_pattern(name: str, str_or_re: IgnoreEntry) -> bool:
"""Check if name matches a pattern, that may be a string or regex.
- If the pattern is a string, name must be equal to match.
- If the pattern is a regex, name must fully match.
"""
# The CI's python is too old for re.Pattern
#if isinstance(str_or_re, re.Pattern):
if not isinstance(str_or_re, str):
return str_or_re.fullmatch(name) is not None
else:
return str_or_re == name
def read_outcome_file(outcome_file: str) -> Outcomes:
"""Parse an outcome file and return an outcome collection.
"""
outcomes = {}
with open(outcome_file, 'r', encoding='utf-8') as input_file:
for line in input_file:
(_platform, component, suite, case, result, _cause) = line.split(';')
# Note that `component` is not unique. If a test case passes on Linux
# and fails on FreeBSD, it'll end up in both the successes set and
# the failures set.
suite_case = ';'.join([suite, case])
if component not in outcomes:
outcomes[component] = ComponentOutcomes(set(), set())
if result == 'PASS':
outcomes[component].successes.add(suite_case)
elif result == 'FAIL':
outcomes[component].failures.add(suite_case)
return outcomes
class Task:
"""Base class for outcome analysis tasks."""
# Override the following in child classes.
# Map test suite names (with the test_suite_prefix) to a list of ignored
# test cases. Each element in the list can be either a string or a regex;
# see the `name_matches_pattern` function.
IGNORED_TESTS = {} #type: typing.Dict[str, typing.List[IgnoreEntry]]
def __init__(self, options) -> None:
"""Pass command line options to the tasks.
Each task decides which command line options it cares about.
"""
pass
def section_name(self) -> str:
"""The section name to use in results."""
raise NotImplementedError
def ignored_tests(self, test_suite: str) -> typing.Iterator[IgnoreEntry]:
"""Generate the ignore list for the specified test suite."""
if test_suite in self.IGNORED_TESTS:
yield from self.IGNORED_TESTS[test_suite]
pos = test_suite.find('.')
if pos != -1:
base_test_suite = test_suite[:pos]
if base_test_suite in self.IGNORED_TESTS:
yield from self.IGNORED_TESTS[base_test_suite]
def is_test_case_ignored(self, test_suite: str, test_string: str) -> bool:
"""Check if the specified test case is ignored."""
for str_or_re in self.ignored_tests(test_suite):
if name_matches_pattern(test_string, str_or_re):
return True
return False
def run(self, results: Results, outcomes: Outcomes):
"""Run the analysis on the specified outcomes.
Signal errors via the results objects
"""
raise NotImplementedError
class CoverageTask(Task):
"""Analyze test coverage."""
# Test cases whose suite and description are matched by an entry in
# IGNORED_TESTS are expected to be never executed.
# All other test cases are expected to be executed at least once.
def __init__(self, options) -> None:
super().__init__(options)
self.full_coverage = options.full_coverage #type: bool
@staticmethod
def section_name() -> str:
return "Analyze coverage"
def run(self, results: Results, outcomes: Outcomes) -> None:
"""Check that all available test cases are executed at least once."""
# Make sure that the generated data files are present (and up-to-date).
# This allows analyze_outcomes.py to run correctly on a fresh Git
# checkout.
cp = subprocess.run(['make', 'generated_files'],
cwd='tests',
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
check=False)
if cp.returncode != 0:
sys.stderr.write(cp.stdout.decode('utf-8'))
results.error("Failed \"make generated_files\" in tests. "
"Coverage analysis may be incorrect.")
available = collect_test_cases.collect_available_test_cases()
for suite_case in available:
hit = any(suite_case in comp_outcomes.successes or
suite_case in comp_outcomes.failures
for comp_outcomes in outcomes.values())
(test_suite, test_description) = suite_case.split(';')
ignored = self.is_test_case_ignored(test_suite, test_description)
if not hit and not ignored:
if self.full_coverage:
results.error('Test case not executed: {}', suite_case)
else:
results.warning('Test case not executed: {}', suite_case)
elif hit and ignored:
# If a test case is no longer always skipped, we should remove
# it from the ignore list.
if self.full_coverage:
results.error('Test case was executed but marked as ignored for coverage: {}',
suite_case)
else:
results.warning('Test case was executed but marked as ignored for coverage: {}',
suite_case)
class DriverVSReference(Task):
"""Compare outcomes from testing with and without a driver.
There are 2 options to use analyze_driver_vs_reference_xxx locally:
1. Run tests and then analysis:
- tests/scripts/all.sh --outcome-file "$PWD/out.csv" <component_ref> <component_driver>
- tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx
2. Let this script run both automatically:
- tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx
"""
# Override the following in child classes.
# Configuration name (all.sh component) used as the reference.
REFERENCE = ''
# Configuration name (all.sh component) used as the driver.
DRIVER = ''
# Ignored test suites (without the test_suite_ prefix).
IGNORED_SUITES = [] #type: typing.List[str]
def __init__(self, options) -> None:
super().__init__(options)
self.ignored_suites = frozenset('test_suite_' + x
for x in self.IGNORED_SUITES)
def section_name(self) -> str:
return f"Analyze driver {self.DRIVER} vs reference {self.REFERENCE}"
def run(self, results: Results, outcomes: Outcomes) -> None:
"""Check that all tests passing in the driver component are also
passing in the corresponding reference component.
Skip:
- full test suites provided in ignored_suites list
- only some specific test inside a test suite, for which the corresponding
output string is provided
"""
ref_outcomes = outcomes.get("component_" + self.REFERENCE)
driver_outcomes = outcomes.get("component_" + self.DRIVER)
if ref_outcomes is None or driver_outcomes is None:
results.error("required components are missing: bad outcome file?")
return
if not ref_outcomes.successes:
results.error("no passing test in reference component: bad outcome file?")
return
for suite_case in ref_outcomes.successes:
# suite_case is like "test_suite_foo.bar;Description of test case"
(full_test_suite, test_string) = suite_case.split(';')
test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name
# Immediately skip fully-ignored test suites
if test_suite in self.ignored_suites or \
full_test_suite in self.ignored_suites:
continue
# For ignored test cases inside test suites, just remember and:
# don't issue an error if they're skipped with drivers,
# but issue an error if they're not (means we have a bad entry).
ignored = self.is_test_case_ignored(full_test_suite, test_string)
if not ignored and not suite_case in driver_outcomes.successes:
results.error("SKIP/FAIL -> PASS: {}", suite_case)
if ignored and suite_case in driver_outcomes.successes:
results.error("uselessly ignored: {}", suite_case)
def main(known_tasks: typing.Dict[str, typing.Type[Task]]) -> None:
main_results = Results()
try:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('outcomes', metavar='OUTCOMES.CSV',
help='Outcome file to analyze')
parser.add_argument('specified_tasks', default='all', nargs='?',
help='Analysis to be done. By default, run all tasks. '
'With one or more TASK, run only those. '
'TASK can be the name of a single task or '
'comma/space-separated list of tasks. ')
parser.add_argument('--list', action='store_true',
help='List all available tasks and exit.')
parser.add_argument('--require-full-coverage', action='store_true',
dest='full_coverage', help="Require all available "
"test cases to be executed and issue an error "
"otherwise. This flag is ignored if 'task' is "
"neither 'all' nor 'analyze_coverage'")
options = parser.parse_args()
if options.list:
for task_name in known_tasks:
print(task_name)
sys.exit(0)
if options.specified_tasks == 'all':
tasks_list = list(known_tasks.keys())
else:
tasks_list = re.split(r'[, ]+', options.specified_tasks)
for task_name in tasks_list:
if task_name not in known_tasks:
sys.stderr.write('invalid task: {}\n'.format(task_name))
sys.exit(2)
# If the outcome file exists, parse it once and share the result
# among tasks to improve performance.
# Otherwise, it will be generated by execute_reference_driver_tests.
if not os.path.exists(options.outcomes):
if len(tasks_list) > 1:
sys.stderr.write("mutiple tasks found, please provide a valid outcomes file.\n")
sys.exit(2)
task_name = tasks_list[0]
task_class = known_tasks[task_name]
if not issubclass(task_class, DriverVSReference):
sys.stderr.write("please provide valid outcomes file for {}.\n".format(task_name))
sys.exit(2)
# mypy isn't smart enough to know that REFERENCE and DRIVER
# are *class* attributes of all classes derived from
# DriverVSReference. (It would be smart enough if we had an
# instance of task_class, but we can't construct an instance
# until we have the outcome data, so at this point we only
# have the class.) So we use indirection to access the class
# attributes.
execute_reference_driver_tests(main_results,
getattr(task_class, 'REFERENCE'),
getattr(task_class, 'DRIVER'),
options.outcomes)
outcomes = read_outcome_file(options.outcomes)
for task_name in tasks_list:
task_constructor = known_tasks[task_name]
task_instance = task_constructor(options)
main_results.new_section(task_instance.section_name())
task_instance.run(main_results, outcomes)
main_results.info("Overall results: {} warnings and {} errors",
main_results.warning_count, main_results.error_count)
sys.exit(0 if (main_results.error_count == 0) else 1)
except Exception: # pylint: disable=broad-except
# Print the backtrace and exit explicitly with our chosen status.
traceback.print_exc()
sys.exit(120)