#!/usr/bin/env python3 """Analyze the test outcomes from a full CI run. This script can also run on outcomes from a partial run, but the results are less likely to be useful. """ import argparse import re import sys import traceback import check_test_cases class Results: """Process analysis results.""" def __init__(self): self.error_count = 0 self.warning_count = 0 @staticmethod def log(fmt, *args, **kwargs): sys.stderr.write((fmt + '\n').format(*args, **kwargs)) def error(self, fmt, *args, **kwargs): self.log('Error: ' + fmt, *args, **kwargs) self.error_count += 1 def warning(self, fmt, *args, **kwargs): self.log('Warning: ' + fmt, *args, **kwargs) self.warning_count += 1 class TestCaseOutcomes: """The outcomes of one test case across many configurations.""" # pylint: disable=too-few-public-methods def __init__(self): self.successes = [] self.failures = [] def hits(self): """Return the number of times a test case has been run. This includes passes and failures, but not skips. """ return len(self.successes) + len(self.failures) class TestDescriptions(check_test_cases.TestDescriptionExplorer): """Collect the available test cases.""" def __init__(self): super().__init__() self.descriptions = set() def process_test_case(self, _per_file_state, file_name, _line_number, description): """Record an available test case.""" base_name = re.sub(r'\.[^.]*$', '', re.sub(r'.*/', '', file_name)) key = ';'.join([base_name, description.decode('utf-8')]) self.descriptions.add(key) def collect_available_test_cases(): """Collect the available test cases.""" explorer = TestDescriptions() explorer.walk_all() return sorted(explorer.descriptions) def analyze_coverage(results, outcomes): """Check that all available test cases are executed at least once.""" available = collect_available_test_cases() for key in available: hits = outcomes[key].hits() if key in outcomes else 0 if hits == 0: # Make this a warning, not an error, as long as we haven't # fixed this branch to have full coverage of test cases. results.warning('Test case not executed: {}', key) def analyze_outcomes(outcomes): """Run all analyses on the given outcome collection.""" results = Results() analyze_coverage(results, outcomes) return results def read_outcome_file(outcome_file): """Parse an outcome file and return an outcome collection. An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects. The keys are the test suite name and the test case description, separated by a semicolon. """ outcomes = {} with open(outcome_file, 'r', encoding='utf-8') as input_file: for line in input_file: (platform, config, suite, case, result, _cause) = line.split(';') key = ';'.join([suite, case]) setup = ';'.join([platform, config]) if key not in outcomes: outcomes[key] = TestCaseOutcomes() if result == 'PASS': outcomes[key].successes.append(setup) elif result == 'FAIL': outcomes[key].failures.append(setup) return outcomes def analyze_outcome_file(outcome_file): """Analyze the given outcome file.""" outcomes = read_outcome_file(outcome_file) return analyze_outcomes(outcomes) def main(): try: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('outcomes', metavar='OUTCOMES.CSV', help='Outcome file to analyze') options = parser.parse_args() results = analyze_outcome_file(options.outcomes) if results.error_count > 0: sys.exit(1) except Exception: # pylint: disable=broad-except # Print the backtrace and exit explicitly with our chosen status. traceback.print_exc() sys.exit(120) if __name__ == '__main__': main()