diff --git a/.pylintrc b/.pylintrc index d427bcb3..8d61ac64 100644 --- a/.pylintrc +++ b/.pylintrc @@ -297,7 +297,7 @@ exclude-too-few-public-methods= ignored-parents= # Maximum number of arguments for function / method. -max-args=7 +max-args=8 # Maximum number of attributes for a class (see R0902). max-attributes=16 @@ -315,7 +315,7 @@ max-locals=16 max-parents=7 # Maximum number of positional arguments for function / method. -max-positional-arguments=7 +max-positional-arguments=8 # Maximum number of public methods for a class (see R0904). max-public-methods=20 diff --git a/Test/astc_test_image.py b/Test/astc_test_image.py index a229e6e1..fe992c1e 100644 --- a/Test/astc_test_image.py +++ b/Test/astc_test_image.py @@ -33,14 +33,13 @@ ''' import argparse -import os from pathlib import Path import sys from typing import Optional import testlib.encoder as te -import testlib.testset as tts -import testlib.resultset as trs +from testlib.testset import TestImage, TestSet +from testlib.resultset import Record, ResultSet, ResultStatus # Require bit exact with reference scores RESULT_THRESHOLD_WARN = -0.00 @@ -69,7 +68,7 @@ def is_3d(block_size: str) -> bool: return block_size.count('x') == 2 -def count_test_set(test_set: tts.TestSet, block_sizes: list[str]) -> int: +def count_test_set(test_set: TestSet, block_sizes: list[str]) -> int: ''' Count the number of test executions needed for a test set. @@ -92,8 +91,8 @@ def count_test_set(test_set: tts.TestSet, block_sizes: list[str]) -> int: return count -def determine_result(image: tts.TestImage, reference: trs.Record, - result: trs.Record) -> trs.ResultStatus: +def determine_result(image: TestImage, reference: Record, + result: Record) -> ResultStatus: ''' Determine a test result against a reference and thresholds. @@ -108,18 +107,18 @@ def determine_result(image: tts.TestImage, reference: trs.Record, psnr_diff = result.psnr - reference.psnr if (psnr_diff < RESULT_THRESHOLD_FAIL) and (not image.is_3d): - return trs.ResultStatus.FAIL + return ResultStatus.FAIL if (psnr_diff < RESULT_THRESHOLD_3D_FAIL) and image.is_3d: - return trs.ResultStatus.FAIL + return ResultStatus.FAIL if psnr_diff < RESULT_THRESHOLD_WARN: - return trs.ResultStatus.WARN + return ResultStatus.WARN - return trs.ResultStatus.PASS + return ResultStatus.PASS -def format_solo_result(image: tts.TestImage, result: trs.Record) -> str: +def format_solo_result(image: TestImage, result: Record) -> str: ''' Format a metrics string for a single (no compare) result. @@ -149,8 +148,8 @@ def format_solo_result(image: tts.TestImage, result: trs.Record) -> str: return ' | '.join(fields) -def format_compare_result(image: tts.TestImage, reference: trs.Record, - result: trs.Record) -> str: +def format_compare_result(image: TestImage, reference: Record, + result: Record) -> str: ''' Format a metrics string for a comparison result. @@ -194,27 +193,68 @@ def format_compare_result(image: tts.TestImage, reference: trs.Record, return ' | '.join(fields) -def run_test_set(encoder, reference, test_set, quality, block_sizes, repeats, - keep_output, threads): +def run_test_image(encoder: te.EncoderBase, reference: Optional[ResultSet], + image: TestImage, quality: str, block_size: str, + repeats: int, keep_output: bool, + threads: Optional[int]) -> tuple[Record, str]: + ''' + Execute one test image in the test set. + + Args: + encoder: The encoder to test. + reference: The reference test results to compare against. + image: The single image to test. + quality: The quality level to test. + block_size: The block size to test. + repeats: The number of test repeats to run. + keep_output: Should the test preserve output images? + threads (int or None): The thread count to use, or None to use + automatic thread count based on core count of host machine. + + Return: + The test results, passed as a test record and a formatted log line + to emit. + ''' + res = encoder.run_test( + image, block_size, f'-{quality}', repeats, + keep_output, threads) + + record = Record(block_size, image.file_name, *res) + + if reference: + record_ref = reference.get_matching_record(record) + record.set_status(determine_result(image, record_ref, record)) + record.set_relative_to_reference(record_ref) + + report = format_compare_result(image, record_ref, record) + else: + report = format_solo_result(image, record) + + return (record, report) + + +def run_test_set(encoder: te.EncoderBase, reference: Optional[ResultSet], + test_set: TestSet, quality: str, block_sizes: list[str], + repeats: int, keep_output: bool, + threads: Optional[int]) -> ResultSet: ''' Execute all tests in the test set. Args: - encoder (EncoderBase): The encoder to use. - reference (ResultSet): The test reference results. - test_set (TestSet): The test set. - quality (str): The quality level to execute the test against. - block_sizes (list(str)): The block sizes to execute each test against. - repeats (int): The number of test repeats to run for each image test. - keep_output (bool): Should the test preserve output images? This is - only a hint and discarding output may be ignored if the encoder - version used can't do it natively. - threads (int or None): The thread count to use. + encoder: The encoder to test. + reference: The reference test results to compare against. + test_set: The set of images to test. + quality: The quality level to test. + block_sizes: The block sizes to test. + repeats: The number of test repeats to run. + keep_output: Should the test preserve output images? + threads (int or None): The thread count to use, or None to use + automatic thread count based on core count of host machine. Return: - ResultSet: The test results. + The test results. ''' - result_set = trs.ResultSet(test_set.name) + result_set = ResultSet(test_set.name) current_test_index = 0 max_test_index = count_test_set(test_set, block_sizes) @@ -235,35 +275,13 @@ def run_test_set(encoder, reference, test_set, quality, block_sizes, repeats, msg = f'Running {progress} {block_size} {image.file_name} ... ' print(msg, end='', flush=True) - res = encoder.run_test( - image, block_size, f'-{quality}', repeats, - keep_output, threads) - - res = trs.Record(block_size, image.file_name, *res) - result_set.add_record(res) - - if reference: - result_ref = reference.get_matching_record(res) - res.set_status(determine_result(image, result_ref, res)) - - try: - res.total_time_rel = result_ref.total_time / res.total_time - except ZeroDivisionError: - res.total_time_rel = float('NaN') - - try: - res.coding_time_rel = \ - result_ref.coding_time / res.coding_time - except ZeroDivisionError: - res.coding_time_rel = float('NaN') - - res.psnr_rel = res.psnr - result_ref.psnr + record, report = run_test_image( + encoder, reference, image, quality, block_size, + repeats, keep_output, threads) - res = format_compare_result(image, result_ref, res) - else: - res = format_solo_result(image, res) + result_set.add_record(record) - print(f'\r[{current_test_index:>3}] {res}') + print(f'\r[{current_test_index:>3}] {report}') return result_set @@ -348,56 +366,54 @@ def parse_command_line(): 'sse2', 'sse4.1', 'avx2' ] - coders = reference_encoders + test_encoders + ['all-aarch64', 'all-x86'] + encoder_choices = reference_encoders + test_encoders + encoder_choices.append('all-aarch64') + encoder_choices.append('all-x86') - parser.add_argument('--encoder', dest='encoders', - default='avx2', - choices=coders, help='test encoder variant') + parser.add_argument('--encoder', dest='encoders', default='avx2', + choices=encoder_choices, help='test encoder variant') parser.add_argument('--reference', default='ref-main-avx2', choices=reference_encoders, help='reference encoder variant') - astc_profile = ['ldr', 'ldrs', 'hdr', 'all'] + profile_choices = ['ldr', 'ldrs', 'hdr', 'all'] parser.add_argument('--color-profile', dest='profiles', default='all', - choices=astc_profile, help='test color profile') + choices=profile_choices, help='test color profile') - color_format = ['l', 'xy', 'rgb', 'rgba', 'all'] + format_choices = ['l', 'xy', 'rgb', 'rgba', 'all'] parser.add_argument('--color-format', dest='formats', default='all', - choices=color_format, help='test color format') - - choices = list(TEST_BLOCK_SIZES) + ['all'] - parser.add_argument('--block-size', dest='block_sizes', - action='append', choices=choices, - help='test block size') - - test_dir = os.path.dirname(__file__) - test_dir = os.path.join(test_dir, 'Images') - test_sets = [] - for path in os.listdir(test_dir): - full_path = os.path.join(test_dir, path) - if os.path.isdir(full_path): - test_sets.append(path) - test_sets.append('all') + choices=format_choices, help='test color format') + block_size_choices = list(TEST_BLOCK_SIZES) + ['all'] + parser.add_argument('--block-size', dest='block_sizes', action='append', + choices=block_size_choices, help='test block size') + + test_dir = Path(__file__).parent / 'Images' + + test_set_choices = [x.name for x in test_dir.iterdir() if x.is_dir()] + test_set_choices.sort() + test_set_choices.append('all') parser.add_argument('--test-set', dest='test_sets', default='Small', - choices=test_sets, help='test image test set') + choices=test_set_choices, help='test image set') parser.add_argument('--test-image', dest='test_image', default=None, help='select a specific test image from the test set') - choices = list(TEST_QUALITIES) + ['all', 'all+'] + quality_choices = list(TEST_QUALITIES) + quality_choices.append('all') + quality_choices.append('all+') parser.add_argument('--test-quality', dest='test_qual', default='thorough', - choices=choices, help='select a specific test quality') + choices=quality_choices, help='test quality') parser.add_argument('--repeats', dest='repeats', default=1, - type=int, help='test iteration count') + type=int, help='test repeat count') parser.add_argument('--keep-output', dest='keep_output', default=False, action='store_true', help='keep image output') parser.add_argument('-j', dest='threads', default=None, - type=int, help='thread count') + type=int, help='encoder thread count') args = parser.parse_args() @@ -421,14 +437,20 @@ def parse_command_line(): if not args.block_sizes or ('all' in args.block_sizes): args.block_sizes = TEST_BLOCK_SIZES - args.test_sets = test_sets[:-1] if args.test_sets == 'all' \ - else [args.test_sets] + if args.test_sets == 'all': + args.test_sets = test_set_choices[:-1] + else: + args.test_sets = [args.test_sets] - args.profiles = astc_profile[:-1] if args.profiles == 'all' \ - else [args.profiles] + if args.profiles == 'all': + args.profiles = profile_choices[:-1] + else: + args.profiles = [args.profiles] - args.formats = color_format[:-1] if args.formats == 'all' \ - else [args.formats] + if args.formats == 'all': + args.formats = format_choices[:-1] + else: + args.formats = [args.formats] return args @@ -444,7 +466,7 @@ def main() -> int: args = parse_command_line() test_set_count = 0 - worst_result = trs.ResultStatus.NOT_RUN + worst_result = ResultStatus.NOT_RUN for quality in args.test_qual: for test_set_name in args.test_sets: @@ -460,11 +482,11 @@ def main() -> int: if reference_name: ref_csv = f'astc_{reference_name}_{quality}_results.csv' ref_csv = f'{test_dir}/{ref_csv}' - reference = trs.ResultSet(test_set_name) + reference = ResultSet(test_set_name) reference.load_from_file(Path(ref_csv)) test_set_count += 1 - test_set = tts.TestSet( + test_set = TestSet( test_set_name, Path(test_dir), args.profiles, args.formats, args.test_image) @@ -481,10 +503,10 @@ def main() -> int: worst_result = max(this_result, worst_result) print(summary) - if (test_set_count > 1) and (worst_result != trs.ResultStatus.NOT_RUN): + if (test_set_count > 1) and (worst_result != ResultStatus.NOT_RUN): print(f'OVERALL STATUS: {worst_result.name}') - if worst_result == trs.ResultStatus.FAIL: + if worst_result == ResultStatus.FAIL: return 1 return 0 diff --git a/Test/testlib/resultset.py b/Test/testlib/resultset.py index eca6c08f..18e33729 100644 --- a/Test/testlib/resultset.py +++ b/Test/testlib/resultset.py @@ -32,8 +32,6 @@ import numpy -from .testset import TestSet - @enum.unique class ResultStatus(enum.IntEnum): @@ -105,24 +103,43 @@ def set_status(self, result: ResultStatus) -> None: ''' self.status = result + def set_relative_to_reference(self, reference: Record) -> None: + ''' + Set relative results compared to an existing reference score. + + Args: + reference: The reference result to compare against. + ''' + self.psnr_rel = self.psnr - reference.psnr + + try: + self.total_time_rel = reference.total_time / self.total_time + except ZeroDivisionError: + self.total_time_rel = float('NaN') + + try: + self.coding_time_rel = reference.coding_time / self.coding_time + except ZeroDivisionError: + self.coding_time_rel = float('NaN') + class ResultSet: ''' A set of results for a TestSet, across one or more block sizes. Attributes: - test_set: The test set these results were generated by. + test_set_name: The name of the test set that generated these results. records: The list of test result records. ''' - def __init__(self, test_set: TestSet): + def __init__(self, test_set_name: str): ''' Create a new empty ResultSet. Args: - test_set: The test set these results were generated by. + test_set_name: The test set these results were generated by. ''' - self.test_set = test_set + self.test_set_name = test_set_name self.records: list[Record] = [] def add_record(self, record: Record) -> None: @@ -213,7 +230,7 @@ def _save_record(self, writer, record: Record) -> None: record (Record): The record to write. ''' row = [ - self.test_set, + self.test_set_name, record.block_size, record.name, f'{record.psnr:0.4f}', @@ -239,7 +256,7 @@ def load_from_file(self, file_path: Path) -> None: # Read the data rows for row in reader: - assert row[0] == self.test_set + assert row[0] == self.test_set_name record = Record( row[1],