]> git.sesse.net Git - x264/commitdiff
Add Python regression test script
authorTony Young <rofflwaffls@gmail.com>
Thu, 25 Nov 2010 00:58:38 +0000 (16:58 -0800)
committerFiona Glaser <fiona@x264.com>
Thu, 25 Nov 2010 21:46:23 +0000 (13:46 -0800)
Patch from Google Code-In.

.gitignore
tools/digress/__init__.py [new file with mode: 0644]
tools/digress/cli.py [new file with mode: 0644]
tools/digress/comparers.py [new file with mode: 0644]
tools/digress/constants.py [new file with mode: 0644]
tools/digress/errors.py [new file with mode: 0644]
tools/digress/scm/__init__.py [new file with mode: 0644]
tools/digress/scm/dummy.py [new file with mode: 0644]
tools/digress/scm/git.py [new file with mode: 0644]
tools/digress/testing.py [new file with mode: 0644]
tools/test_x264.py [new file with mode: 0755]

index 9d8cb70ff189cdb3e2b77eab865d47fcd6f80ea6..ece28abf8c9fbe7fbbd30d28da603b6ffc44c7a6 100644 (file)
@@ -23,3 +23,6 @@ checkasm
 *.y4m
 *.yuv
 x264_2pass.log
+
+.digress_x264
+*.pyc
diff --git a/tools/digress/__init__.py b/tools/digress/__init__.py
new file mode 100644 (file)
index 0000000..8623d48
--- /dev/null
@@ -0,0 +1,12 @@
+"""
+Automated regression/unit testing suite.
+"""
+
+__version__ = '0.2'
+
+def digress(fixture):
+    """
+    Command-line helper for Digress.
+    """
+    from digress.cli import Dispatcher
+    Dispatcher(fixture).dispatch()
diff --git a/tools/digress/cli.py b/tools/digress/cli.py
new file mode 100644 (file)
index 0000000..44158a4
--- /dev/null
@@ -0,0 +1,149 @@
+"""
+Digress's CLI interface.
+"""
+
+import inspect
+import sys
+from optparse import OptionParser
+
+import textwrap
+
+from types import MethodType
+
+from digress import __version__ as version
+
+def dispatchable(func):
+    """
+    Mark a method as dispatchable.
+    """
+    func.digress_dispatchable = True
+    return func
+
+class Dispatcher(object):
+    """
+    Dispatcher for CLI commands.
+    """
+    def __init__(self, fixture):
+        self.fixture = fixture
+        fixture.dispatcher = self
+
+    def _monkey_print_help(self, optparse, *args, **kwargs):
+        # monkey patches OptionParser._print_help
+        OptionParser.print_help(optparse, *args, **kwargs)
+
+        print >>sys.stderr, "\nAvailable commands:"
+
+        maxlen = max([ len(command_name) for command_name in self.commands ])
+
+        descwidth = 80 - maxlen - 4
+
+        for command_name, command_meth in self.commands.iteritems():
+            print >>sys.stderr, "  %s %s\n" % (
+                command_name.ljust(maxlen + 1),
+                ("\n" + (maxlen + 4) * " ").join(
+                    textwrap.wrap(" ".join(filter(
+                            None,
+                            command_meth.__doc__.strip().replace("\n", " ").split(" ")
+                        )),
+                        descwidth
+                    )
+                )
+            )
+
+    def _enable_flush(self):
+        self.fixture.flush_before = True
+
+    def _populate_parser(self):
+        self.commands = self._get_commands()
+
+        self.optparse = OptionParser(
+            usage = "usage: %prog [options] command [args]",
+            description = "Digress CLI frontend for %s." % self.fixture.__class__.__name__,
+            version = "Digress %s" % version
+        )
+
+        self.optparse.print_help = MethodType(self._monkey_print_help, self.optparse, OptionParser)
+
+        self.optparse.add_option(
+            "-f",
+            "--flush",
+            action="callback",
+            callback=lambda option, opt, value, parser: self._enable_flush(),
+            help="flush existing data for a revision before testing"
+        )
+
+        self.optparse.add_option(
+            "-c",
+            "--cases",
+            metavar="FOO,BAR",
+            action="callback",
+            dest="cases",
+            type=str,
+            callback=lambda option, opt, value, parser: self._select_cases(*value.split(",")),
+            help="test cases to run, run with command list to see full list"
+        )
+
+    def _select_cases(self, *cases):
+        self.fixture.cases = filter(lambda case: case.__name__ in cases, self.fixture.cases)
+
+    def _get_commands(self):
+        commands = {}
+
+        for name, member in inspect.getmembers(self.fixture):
+            if hasattr(member, "digress_dispatchable"):
+                commands[name] = member
+
+        return commands
+
+    def _run_command(self, name, *args):
+        if name not in self.commands:
+            print >>sys.stderr, "error: %s is not a valid command\n" % name
+            self.optparse.print_help()
+            return
+
+        command = self.commands[name]
+
+        argspec = inspect.getargspec(command)
+
+        max_arg_len = len(argspec.args) - 1
+        min_arg_len = max_arg_len - ((argspec.defaults is not None) and len(argspec.defaults) or 0)
+
+        if len(args) < min_arg_len:
+            print >>sys.stderr, "error: %s takes at least %d arguments\n" % (
+                name,
+                min_arg_len
+            )
+            print >>sys.stderr, "%s\n" % command.__doc__
+            self.optparse.print_help()
+            return
+
+        if len(args) > max_arg_len:
+            print >>sys.stderr, "error: %s takes at most %d arguments\n" % (
+                name,
+                max_arg_len
+            )
+            print >>sys.stderr, "%s\n" % command.__doc__
+            self.optparse.print_help()
+            return
+
+        command(*args)
+
+    def pre_dispatch(self):
+        pass
+
+    def dispatch(self):
+        self._populate_parser()
+
+        self.optparse.parse_args()
+        self.pre_dispatch()
+        args = self.optparse.parse_args()[1] # arguments may require reparsing after pre_dispatch; see test_x264.py
+
+        if len(args) == 0:
+            print >>sys.stderr, "error: no comamnd specified\n"
+            self.optparse.print_help()
+            return
+
+        command = args[0]
+        addenda = args[1:]
+
+        self._run_command(command, *addenda)
diff --git a/tools/digress/comparers.py b/tools/digress/comparers.py
new file mode 100644 (file)
index 0000000..215ffee
--- /dev/null
@@ -0,0 +1,68 @@
+"""
+Digress comparers.
+"""
+
+from digress.errors import ComparisonError
+
+import os
+from itertools import imap, izip
+
+def compare_direct(value_a, value_b):
+    if value_a != value_b:
+        raise ComparisonError("%s is not %s" % (value_a, value_b))
+
+def compare_pass(value_a, value_b):
+    """
+    Always true, as long as the test is passed.
+    """
+
+def compare_tolerance(tolerance):
+    def _compare_tolerance(value_a, value_b):
+        if abs(value_a - value_b) > tolerance:
+            raise ComparisonError("%s is not %s (tolerance: %s)" % (
+                value_a,
+                value_b,
+                tolerance
+            ))
+    return _compare_tolerance
+
+def compare_files(file_a, file_b):
+    size_a = os.path.getsize(file_a)
+    size_b = os.path.getsize(file_b)
+
+    print file_a, file_b
+
+    if size_a != size_b:
+        raise ComparisonError("%s is not the same size as %s" % (
+            file_a,
+            file_b
+        ))
+
+    BUFFER_SIZE = 8196
+
+    offset = 0
+
+    with open(file_a) as f_a:
+        with open(file_b) as f_b:
+            for chunk_a, chunk_b in izip(
+                imap(
+                    lambda i: f_a.read(BUFFER_SIZE),
+                    xrange(size_a // BUFFER_SIZE + 1)
+                ),
+                imap(
+                    lambda i: f_b.read(BUFFER_SIZE),
+                    xrange(size_b // BUFFER_SIZE + 1)
+                )
+            ):
+                chunk_size = len(chunk_a)
+
+                if chunk_a != chunk_b:
+                    for i in xrange(chunk_size):
+                        if chunk_a[i] != chunk_b[i]:
+                            raise ComparisonError("%s differs from %s at offset %d" % (
+                                file_a,
+                                file_b,
+                                offset + i
+                            ))
+
+                offset += chunk_size
diff --git a/tools/digress/constants.py b/tools/digress/constants.py
new file mode 100644 (file)
index 0000000..1a18bab
--- /dev/null
@@ -0,0 +1,14 @@
+"""
+All of Digress's constants.
+"""
+
+TEST_PASS = 0
+TEST_FAIL = 1
+TEST_DISABLED = 2
+TEST_SKIPPED = 3
+
+CASE_PASS = 0
+CASE_FAIL = 1
+
+FIXTURE_PASS = 0
+FIXTURE_FAIL = 1
diff --git a/tools/digress/errors.py b/tools/digress/errors.py
new file mode 100644 (file)
index 0000000..8862829
--- /dev/null
@@ -0,0 +1,63 @@
+"""
+Digress errors.
+"""
+
+class DigressError(Exception):
+    """
+    Digress error base class.
+    """
+
+class NoSuchTestError(DigressError):
+    """
+    Raised when no such test exists.
+    """
+
+class DisabledTestError(DigressError):
+    """
+    Test is disabled.
+    """
+
+class SkippedTestError(DigressError):
+    """
+    Test is marked as skipped.
+    """
+
+class DisabledCaseError(DigressError):
+    """
+    Case is marked as disabled.
+    """
+
+class SkippedCaseError(DigressError):
+    """
+    Case is marked as skipped.
+    """
+
+class FailedTestError(DigressError):
+    """
+    Test failed.
+    """
+
+class ComparisonError(DigressError):
+    """
+    Comparison failed.
+    """
+
+class IncomparableError(DigressError):
+    """
+    Values cannot be compared.
+    """
+
+class AlreadyRunError(DigressError):
+    """
+    Test/case has already been run.
+    """
+
+class SCMError(DigressError):
+    """
+    Error occurred in SCM.
+    """
+    def __init__(self, message):
+        self.message = message.replace("\n", " ")
+
+    def __str__(self):
+        return self.message
diff --git a/tools/digress/scm/__init__.py b/tools/digress/scm/__init__.py
new file mode 100644 (file)
index 0000000..95ba161
--- /dev/null
@@ -0,0 +1,3 @@
+"""
+Source control backends for Digress.
+"""
diff --git a/tools/digress/scm/dummy.py b/tools/digress/scm/dummy.py
new file mode 100644 (file)
index 0000000..bf88a04
--- /dev/null
@@ -0,0 +1,41 @@
+"""
+Dummy SCM backend for Digress.
+"""
+
+from random import random
+
+def checkout(revision):
+    """
+    Checkout a revision.
+    """
+    pass
+
+def current_rev():
+    """
+    Get the current revision
+    """
+    return str(random())
+
+def revisions(rev_a, rev_b):
+    """
+    Get a list of revisions from one to another.
+    """
+    pass
+
+def stash():
+    """
+    Stash the repository.
+    """
+    pass
+
+def unstash():
+    """
+    Unstash the repository.
+    """
+    pass
+
+def bisect(command, revision):
+    """
+    Perform a bisection.
+    """
+    raise NotImplementedError("dummy SCM backend does not support bisection")
diff --git a/tools/digress/scm/git.py b/tools/digress/scm/git.py
new file mode 100644 (file)
index 0000000..01b508c
--- /dev/null
@@ -0,0 +1,99 @@
+"""
+Git SCM backend for Digress.
+"""
+
+from subprocess import Popen, PIPE, STDOUT
+from digress.errors import SCMError
+
+def checkout(revision):
+    """
+    Checkout a revision from git.
+    """
+    proc = Popen([
+        "git",
+        "checkout",
+        "-f",
+        revision
+    ], stdout=PIPE, stderr=STDOUT)
+
+    output = proc.communicate()[0].strip()
+    if proc.returncode != 0:
+        raise SCMError("checkout error: %s" % output)
+
+def rev_parse(ref):
+    proc = Popen([
+        "git",
+        "rev-parse",
+        ref
+    ], stdout=PIPE, stderr=STDOUT)
+
+    output = proc.communicate()[0].strip()
+    if proc.returncode != 0:
+        raise SCMError("rev-parse error: %s" % output)
+    return output
+
+def current_rev():
+    """
+    Get the current revision.
+    """
+    return rev_parse("HEAD")
+
+def revisions(rev_a, rev_b):
+    """
+    Get a list of revisions from one to another.
+    """
+    proc = Popen([
+        "git",
+        "log",
+        "--format=%H", ("%s...%s" % (rev_a, rev_b))
+    ], stdout=PIPE, stderr=STDOUT)
+
+    output = proc.communicate()[0].strip()
+    if proc.returncode != 0:
+        raise SCMError("log error: %s" % output)
+    return output.split("\n")
+
+def stash():
+    """
+    Stash the repository.
+    """
+    proc = Popen([
+        "git",
+        "stash",
+        "save",
+        "--keep-index"
+    ], stdout=PIPE, stderr=STDOUT)
+
+    output = proc.communicate()[0].strip()
+    if proc.returncode != 0:
+        raise SCMError("stash error: %s" % output)
+
+def unstash():
+    """
+    Unstash the repository.
+    """
+    proc = Popen(["git", "stash", "pop"], stdout=PIPE, stderr=STDOUT)
+    proc.communicate()
+
+def bisect(*args):
+    """
+    Perform a bisection.
+    """
+    proc = Popen((["git", "bisect"] + list(args)), stdout=PIPE, stderr=STDOUT)
+    output = proc.communicate()[0]
+    if proc.returncode != 0:
+        raise SCMError("bisect error: %s" % output)
+    return output
+
+def dirty():
+    """
+    Check if the working tree is dirty.
+    """
+    proc = Popen(["git", "status"], stdout=PIPE, stderr=STDOUT)
+    output = proc.communicate()[0].strip()
+    if proc.returncode != 0:
+        raise SCMError("status error: %s" % output)
+    if "modified:" in output:
+        return True
+    else:
+        return False
diff --git a/tools/digress/testing.py b/tools/digress/testing.py
new file mode 100644 (file)
index 0000000..bc45097
--- /dev/null
@@ -0,0 +1,582 @@
+"""
+Digress testing core.
+"""
+
+from digress.errors import SkippedTestError, DisabledTestError, NoSuchTestError, \
+                           FailedTestError, AlreadyRunError, SCMError, \
+                           ComparisonError
+from digress.constants import *
+from digress.cli import dispatchable
+
+import inspect
+import operator
+import os
+import json
+
+import textwrap
+
+from shutil import rmtree
+
+from time import time
+from functools import wraps
+
+from itertools import izip_longest
+
+class depends(object):
+    """
+    Dependency decorator for a test.
+    """
+    def __init__(self, *test_names):
+        self.test_names = test_names
+
+    def __call__(self, func):
+        func.digress_depends = self.test_names
+        return func
+
+class _skipped(object):
+    """
+    Internal skipped decorator.
+    """
+    def __init__(self, reason=""):
+        self._reason = reason
+
+    def __call__(self, func):
+        @wraps(func)
+        def _closure(*args):
+            raise SkippedTestError(self._reason)
+        return _closure
+
+class disabled(object):
+    """
+    Disable a test, with reason.
+    """
+    def __init__(self, reason=""):
+        self._reason = reason
+
+    def __call__(self, func):
+        @wraps(func)
+        def _closure(*args):
+            raise DisabledTestError(self._reason)
+        return _closure
+
+class comparer(object):
+    """
+    Set the comparer for a test.
+    """
+    def __init__(self, comparer_):
+        self._comparer = comparer_
+
+    def __call__(self, func):
+        func.digress_comparer = self._comparer
+        return func
+
+class Fixture(object):
+    cases = []
+    scm = None
+
+    flush_before = False
+
+    def _skip_case(self, case, depend):
+        for name, meth in inspect.getmembers(case):
+            if name[:5] == "test_":
+                setattr(
+                    case,
+                    name,
+                    _skipped("failed dependency: case %s" % depend)(meth)
+                )
+
+    def _run_case(self, case, results):
+        if case.__name__ in results:
+            raise AlreadyRunError
+
+        for depend in case.depends:
+            if depend.__name__ in results and results[depend.__name__]["status"] != CASE_PASS:
+                self._skip_case(case, depend.__name__)
+
+            try:
+                result = self._run_case(depend, results)
+            except AlreadyRunError:
+                continue
+
+            if result["status"] != CASE_PASS:
+                self._skip_case(case, depend.__name__)
+
+        result = case().run()
+        results[case.__name__] = result
+        return result
+
+    @dispatchable
+    def flush(self, revision=None):
+        """
+        Flush any cached results. Takes a revision for an optional argument.
+        """
+        if not revision:
+            print "Flushing all cached results...",
+
+            try:
+                rmtree(".digress_%s" % self.__class__.__name__)
+            except Exception, e:
+                print "failed: %s" % e
+            else:
+                print "done."
+        else:
+            try:
+                rev = self.scm.rev_parse(revision)
+            except SCMError, e:
+                print e
+            else:
+                print "Flushing cached results for %s..." % rev,
+
+                try:
+                    rmtree(os.path.join(".digress_%s" % self.__class__.__name__, rev))
+                except Exception, e:
+                    print "failed: %s" % e
+                else:
+                    print "done."
+
+    @dispatchable
+    def run(self, revision=None):
+        """
+        Run the fixture for a specified revision.
+
+        Takes a revision for an argument.
+        """
+        oldrev = None
+        dirty = False
+
+        try:
+            dirty = self.scm.dirty()
+
+            # if the tree is clean, then we don't need to make an exception
+            if not dirty and revision is None: revision = "HEAD"
+
+            if revision:
+                oldrev = self.scm.current_rev()
+
+                if dirty:
+                    self.scm.stash()
+                self.scm.checkout(revision)
+
+                rev = self.scm.current_rev()
+
+                self.datastore = os.path.join(".digress_%s" % self.__class__.__name__, rev)
+
+                if os.path.isdir(self.datastore):
+                    if self.flush_before:
+                        self.flush(rev)
+
+                os.makedirs(self.datastore)
+            else:
+                rev = "(dirty working tree)"
+                self.datastore = None
+
+            print "Running fixture %s on revision %s...\n" % (self.__class__.__name__, rev)
+
+            results = {}
+
+            for case in self.cases:
+                try:
+                    self._run_case(case, results)
+                except AlreadyRunError:
+                    continue
+
+            total_time = reduce(operator.add, filter(
+                None,
+                [
+                    result["time"] for result in results.values()
+                ]
+            ), 0)
+
+            overall_status = (
+                CASE_FAIL in [ result["status"] for result in results.values() ]
+            ) and FIXTURE_FAIL or FIXTURE_PASS
+
+            print "Fixture %s in %.4f.\n" % (
+                (overall_status == FIXTURE_PASS) and "passed" or "failed",
+                total_time
+            )
+
+            return { "cases" : results, "time" : total_time, "status" : overall_status, "revision" : rev }
+
+        finally:
+            if oldrev:
+                self.scm.checkout(oldrev)
+                if dirty:
+                    self.scm.unstash()
+
+    @dispatchable
+    def bisect(self, good_rev, bad_rev=None):
+        """
+        Perform a bisection between two revisions.
+
+        First argument is the good revision, second is the bad revision, which
+        defaults to the current revision.
+        """
+        if not bad_rev: bad_rev = self.scm.current_rev()
+
+        dirty = False
+
+        # get a set of results for the good revision
+        good_result = self.run(good_rev)
+
+        good_rev = good_result["revision"]
+
+        try:
+            dirty = self.scm.dirty()
+
+            if dirty:
+                self.scm.stash()
+
+            self.scm.bisect("start")
+
+            self.scm.bisect("bad", bad_rev)
+            self.scm.bisect("good", good_rev)
+
+            bisecting = True
+            isbad = False
+
+            while bisecting:
+                results = self.run(self.scm.current_rev())
+                revision = results["revision"]
+
+                # perform comparisons
+                # FIXME: this just uses a lot of self.compare
+                for case_name, case_result in good_result["cases"].iteritems():
+                    case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
+
+                    for test_name, test_result in case_result["tests"].iteritems():
+                        test = filter(
+                            lambda pair: pair[0] == "test_%s" % test_name,
+                            inspect.getmembers(case)
+                        )[0][1]
+
+                        other_result = results["cases"][case_name]["tests"][test_name]
+
+                        if other_result["status"] == TEST_FAIL and case_result["status"] != TEST_FAIL:
+                            print "Revision %s failed %s.%s." % (revision, case_name, test_name)
+                            isbad = True
+                            break
+
+                        elif hasattr(test, "digress_comparer"):
+                            try:
+                                test.digress_comparer(test_result["value"], other_result["value"])
+                            except ComparisonError, e:
+                                print "%s differs: %s" % (test_name, e)
+                                isbad = True
+                                break
+
+                if isbad:
+                    output = self.scm.bisect("bad", revision)
+                    print "Marking revision %s as bad." % revision
+                else:
+                    output = self.scm.bisect("good", revision)
+                    print "Marking revision %s as good." % revision
+
+                if output.split("\n")[0].endswith("is the first bad commit"):
+                    print "\nBisection complete.\n"
+                    print output
+                    bisecting = False
+
+                print ""
+        except SCMError, e:
+            print e
+        finally:
+            self.scm.bisect("reset")
+
+            if dirty:
+                self.scm.unstash()
+
+    @dispatchable
+    def multicompare(self, rev_a=None, rev_b=None, mode="waterfall"):
+        """
+        Generate a comparison of tests.
+
+        Takes three optional arguments, from which revision, to which revision,
+        and the method of display (defaults to vertical "waterfall", also
+        accepts "river" for horizontal display)
+        """
+        if not rev_a: rev_a = self.scm.current_rev()
+        if not rev_b: rev_b = self.scm.current_rev()
+
+        revisions = self.scm.revisions(rev_a, rev_b)
+
+        results = []
+
+        for revision in revisions:
+            results.append(self.run(revision))
+
+        test_names = reduce(operator.add, [
+            [
+                (case_name, test_name)
+                for
+                    test_name, test_result
+                in
+                    case_result["tests"].iteritems()
+            ]
+            for
+                case_name, case_result
+            in
+                results[0]["cases"].iteritems()
+        ], [])
+
+        MAXLEN = 20
+
+        colfmt = "| %s "
+
+        table = []
+
+        if mode not in ("waterfall", "river"):
+            mode = "waterfall"
+
+            print "Unknown multicompare mode specified, defaulting to %s." % mode
+
+        if mode == "waterfall":
+            header = [ "Test" ]
+
+            for result in results:
+                header.append(result["revision"])
+
+            table.append(header)
+
+            for test_name in test_names:
+                row_data = [ ".".join(test_name) ]
+
+                for result in results:
+                    test_result = result["cases"][test_name[0]]["tests"][test_name[1]]
+
+                    if test_result["status"] != TEST_PASS:
+                        value = "did not pass: %s" % (test_result["value"])
+                    else:
+                        value = "%s (%.4f)" % (test_result["value"], test_result["time"])
+
+                    row_data.append(value)
+
+                table.append(row_data)
+
+        elif mode == "river":
+            header = [ "Revision" ]
+
+            for test_name in test_names:
+                header.append(".".join(test_name))
+
+            table.append(header)
+
+            for result in results:
+                row_data = [ result["revision"] ]
+
+                for case_name, case_result in result["cases"].iteritems():
+                    for test_name, test_result in case_result["tests"].iteritems():
+
+                        if test_result["status"] != TEST_PASS:
+                            value = "did not pass: %s" % (test_result["value"])
+                        else:
+                            value = "%s (%.4f)" % (test_result["value"], test_result["time"])
+
+                        row_data.append(value)
+
+                table.append(row_data)
+
+        breaker = "=" * (len(colfmt % "".center(MAXLEN)) * len(table[0]) + 1)
+
+        print breaker
+
+        for row in table:
+            for row_stuff in izip_longest(*[
+                textwrap.wrap(col, MAXLEN, break_on_hyphens=False) for col in row
+            ], fillvalue=""):
+                row_output = ""
+
+                for col in row_stuff:
+                    row_output += colfmt % col.ljust(MAXLEN)
+
+                row_output += "|"
+
+                print row_output
+            print breaker
+
+    @dispatchable
+    def compare(self, rev_a, rev_b=None):
+        """
+        Compare two revisions directly.
+
+        Takes two arguments, second is optional and implies current revision.
+        """
+        results_a = self.run(rev_a)
+        results_b = self.run(rev_b)
+
+        for case_name, case_result in results_a["cases"].iteritems():
+            case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
+
+            header = "Comparison of case %s" % case_name
+            print header
+            print "=" * len(header)
+
+            for test_name, test_result in case_result["tests"].iteritems():
+                test = filter(
+                    lambda pair: pair[0] == "test_%s" % test_name,
+                    inspect.getmembers(case)
+                )[0][1]
+
+                other_result = results_b["cases"][case_name]["tests"][test_name]
+
+                if test_result["status"] != TEST_PASS or other_result["status"] != TEST_PASS:
+                    print "%s cannot be compared as one of the revisions have not passed it." % test_name
+
+                elif hasattr(test, "digress_comparer"):
+                    try:
+                        test.digress_comparer(test_result["value"], other_result["value"])
+                    except ComparisonError, e:
+                        print "%s differs: %s" % (test_name, e)
+                    else:
+                        print "%s does not differ." % test_name
+                else:
+                    print "%s has no comparer and therefore cannot be compared." % test_name
+
+            print ""
+
+    @dispatchable
+    def list(self):
+        """
+        List all available test cases, excluding dependencies.
+        """
+        print "\nAvailable Test Cases"
+        print "===================="
+        for case in self.cases:
+            print case.__name__
+
+    def register_case(self, case):
+        case.fixture = self
+        self.cases.append(case)
+
+class Case(object):
+    depends = []
+    fixture = None
+
+    def _get_test_by_name(self, test_name):
+        if not hasattr(self, "test_%s" % test_name):
+            raise NoSuchTestError(test_name)
+        return getattr(self, "test_%s" % test_name)
+
+    def _run_test(self, test, results):
+        test_name = test.__name__[5:]
+
+        if test_name in results:
+            raise AlreadyRunError
+
+        if hasattr(test, "digress_depends"):
+            for depend in test.digress_depends:
+                if depend in results and results[depend]["status"] != TEST_PASS:
+                    test = _skipped("failed dependency: %s" % depend)(test)
+
+                dependtest = self._get_test_by_name(depend)
+
+                try:
+                    result = self._run_test(dependtest, results)
+                except AlreadyRunError:
+                    continue
+
+                if result["status"] != TEST_PASS:
+                    test = _skipped("failed dependency: %s" % depend)(test)
+
+        start_time = time()
+        run_time = None
+
+        print "Running test %s..." % test_name,
+
+        try:
+            if not self.datastore:
+                # XXX: this smells funny
+                raise IOError
+
+            with open(os.path.join(self.datastore, "%s.json" % test_name), "r") as f:
+                result = json.load(f)
+
+            value = str(result["value"])
+
+            if result["status"] == TEST_DISABLED:
+                status = "disabled"
+            elif result["status"] == TEST_SKIPPED:
+                status = "skipped"
+            elif result["status"] == TEST_FAIL:
+                status = "failed"
+            elif result["status"] == TEST_PASS:
+                status = "passed"
+                value = "%s (in %.4f)" % (
+                    result["value"] or "(no result)",
+                    result["time"]
+                )
+            else:
+                status = "???"
+
+            print "%s (cached): %s" % (status, value)
+        except IOError:
+            try:
+                value = test()
+            except DisabledTestError, e:
+                print "disabled: %s" % e
+                status = TEST_DISABLED
+                value = str(e)
+            except SkippedTestError, e:
+                print "skipped: %s" % e
+                status = TEST_SKIPPED
+                value = str(e)
+            except FailedTestError, e:
+                print "failed: %s" % e
+                status = TEST_FAIL
+                value = str(e)
+            except Exception, e:
+                print "failed with exception: %s" % e
+                status = TEST_FAIL
+                value = str(e)
+            else:
+                run_time = time() - start_time
+                print "passed: %s (in %.4f)" % (
+                    value or "(no result)",
+                    run_time
+                )
+                status = TEST_PASS
+
+            result = { "status" : status, "value" : value, "time" : run_time }
+
+            if self.datastore:
+                with open(os.path.join(self.datastore, "%s.json" % test_name), "w") as f:
+                    json.dump(result, f)
+
+        results[test_name] = result
+        return result
+
+    def run(self):
+        print "Running case %s..." % self.__class__.__name__
+
+        if self.fixture.datastore:
+            self.datastore = os.path.join(self.fixture.datastore, self.__class__.__name__)
+            if not os.path.isdir(self.datastore):
+                os.makedirs(self.datastore)
+        else:
+            self.datastore = None
+
+        results = {}
+
+        for name, meth in inspect.getmembers(self):
+            if name[:5] == "test_":
+                try:
+                    self._run_test(meth, results)
+                except AlreadyRunError:
+                    continue
+
+        total_time = reduce(operator.add, filter(
+            None, [
+                result["time"] for result in results.values()
+            ]
+        ), 0)
+
+        overall_status = (
+            TEST_FAIL in [ result["status"] for result in results.values() ]
+        ) and CASE_FAIL or CASE_PASS
+
+        print "Case %s in %.4f.\n" % (
+            (overall_status == FIXTURE_PASS) and "passed" or "failed",
+            total_time
+        )
+
+        return { "tests" : results, "time" : total_time, "status" : overall_status }
diff --git a/tools/test_x264.py b/tools/test_x264.py
new file mode 100755 (executable)
index 0000000..8defa59
--- /dev/null
@@ -0,0 +1,479 @@
+#!/usr/bin/env python
+
+import operator
+
+from optparse import OptionGroup
+
+import sys
+
+from time import time
+
+from digress.cli import Dispatcher as _Dispatcher
+from digress.errors import ComparisonError, FailedTestError, DisabledTestError
+from digress.testing import depends, comparer, Fixture, Case
+from digress.comparers import compare_pass
+from digress.scm import git as x264git
+
+from subprocess import Popen, PIPE, STDOUT
+
+import os
+import re
+import shlex
+import inspect
+
+from random import randrange, seed
+from math import ceil
+
+from itertools import imap, izip
+
+os.chdir(os.path.join(os.path.dirname(__file__), ".."))
+
+# options
+
+OPTIONS = [
+    [ "--tune %s" % t for t in ("film", "zerolatency") ],
+    ("", "--intra-refresh"),
+    ("", "--no-cabac"),
+    [ "--preset %s" % p for p in ("ultrafast",
+                                  "superfast",
+                                  "veryfast",
+                                  "faster",
+                                  "fast",
+                                  "medium",
+                                  "slow",
+                                  "slower") ]
+]
+
+# end options
+
+def compare_yuv_output(width, height):
+    def _compare_yuv_output(file_a, file_b):
+        size_a = os.path.getsize(file_a)
+        size_b = os.path.getsize(file_b)
+
+        if size_a != size_b:
+            raise ComparisonError("%s is not the same size as %s" % (
+                file_a,
+                file_b
+            ))
+
+        BUFFER_SIZE = 8196
+
+        offset = 0
+
+        with open(file_a) as f_a:
+            with open(file_b) as f_b:
+                for chunk_a, chunk_b in izip(
+                    imap(
+                        lambda i: f_a.read(BUFFER_SIZE),
+                        xrange(size_a // BUFFER_SIZE + 1)
+                    ),
+                    imap(
+                        lambda i: f_b.read(BUFFER_SIZE),
+                        xrange(size_b // BUFFER_SIZE + 1)
+                    )
+                ):
+                    chunk_size = len(chunk_a)
+
+                    if chunk_a != chunk_b:
+                        for i in xrange(chunk_size):
+                            if chunk_a[i] != chunk_b[i]:
+                                # calculate the macroblock, plane and frame from the offset
+                                offs = offset + i
+
+                                y_plane_area = width * height
+                                u_plane_area = y_plane_area + y_plane_area * 0.25
+                                v_plane_area = u_plane_area + y_plane_area * 0.25
+
+                                pixel = offs % v_plane_area
+                                frame = offs // v_plane_area
+
+                                if pixel < y_plane_area:
+                                    plane = "Y"
+
+                                    pixel_x = pixel % width
+                                    pixel_y = pixel // width
+
+                                    macroblock = (ceil(pixel_x / 16.0), ceil(pixel_y / 16.0))
+                                elif pixel < u_plane_area:
+                                    plane = "U"
+
+                                    pixel -= y_plane_area
+
+                                    pixel_x = pixel % width
+                                    pixel_y = pixel // width
+
+                                    macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
+                                else:
+                                    plane = "V"
+
+                                    pixel -= u_plane_area
+
+                                    pixel_x = pixel % width
+                                    pixel_y = pixel // width
+
+                                    macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
+
+                                macroblock = tuple([ int(x) for x in macroblock ])
+
+                                raise ComparisonError("%s differs from %s at frame %d, " \
+                                                      "macroblock %s on the %s plane (offset %d)" % (
+                                    file_a,
+                                    file_b,
+                                    frame,
+                                    macroblock,
+                                    plane,
+                                    offs)
+                                )
+
+                    offset += chunk_size
+
+    return _compare_yuv_output
+
+def program_exists(program):
+    def is_exe(fpath):
+        return os.path.exists(fpath) and os.access(fpath, os.X_OK)
+
+    fpath, fname = os.path.split(program)
+
+    if fpath:
+        if is_exe(program):
+            return program
+    else:
+        for path in os.environ["PATH"].split(os.pathsep):
+            exe_file = os.path.join(path, program)
+            if is_exe(exe_file):
+                return exe_file
+
+    return None
+
+class x264(Fixture):
+    scm = x264git
+
+class Compile(Case):
+    @comparer(compare_pass)
+    def test_configure(self):
+        Popen([
+            "make",
+            "distclean"
+        ], stdout=PIPE, stderr=STDOUT).communicate()
+
+        configure_proc = Popen([
+            "./configure"
+        ] + self.fixture.dispatcher.configure, stdout=PIPE, stderr=STDOUT)
+
+        output = configure_proc.communicate()[0]
+        if configure_proc.returncode != 0:
+            raise FailedTestError("configure failed: %s" % output.replace("\n", " "))
+
+    @depends("configure")
+    @comparer(compare_pass)
+    def test_make(self):
+        make_proc = Popen([
+            "make",
+            "-j5"
+        ], stdout=PIPE, stderr=STDOUT)
+
+        output = make_proc.communicate()[0]
+        if make_proc.returncode != 0:
+            raise FailedTestError("make failed: %s" % output.replace("\n", " "))
+
+_dimension_pattern = re.compile(r"\w+ [[]info[]]: (\d+)x(\d+)[pi] \d+:\d+ @ \d+/\d+ fps [(][vc]fr[)]")
+
+def _YUVOutputComparisonFactory():
+    class YUVOutputComparison(Case):
+        _dimension_pattern = _dimension_pattern
+
+        depends = [ Compile ]
+        options = []
+
+        def __init__(self):
+            for name, meth in inspect.getmembers(self):
+                if name[:5] == "test_" and name[5:] not in self.fixture.dispatcher.yuv_tests:
+                    delattr(self.__class__, name)
+
+        def _run_x264(self):
+            x264_proc = Popen([
+                "./x264",
+                "-o",
+                "%s.264" % self.fixture.dispatcher.video,
+                "--dump-yuv",
+                "x264-output.yuv"
+            ] + self.options + [
+                self.fixture.dispatcher.video
+            ], stdout=PIPE, stderr=STDOUT)
+
+            output = x264_proc.communicate()[0]
+            if x264_proc.returncode != 0:
+                raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
+
+            matches = _dimension_pattern.match(output)
+
+            return (int(matches.group(1)), int(matches.group(2)))
+
+        @comparer(compare_pass)
+        def test_jm(self):
+            if not program_exists("ldecod"): raise DisabledTestError("jm unavailable")
+
+            try:
+                runres = self._run_x264()
+
+                jm_proc = Popen([
+                    "ldecod",
+                    "-i",
+                    "%s.264" % self.fixture.dispatcher.video,
+                    "-o",
+                    "jm-output.yuv"
+                ], stdout=PIPE, stderr=STDOUT)
+
+                output = jm_proc.communicate()[0]
+                if jm_proc.returncode != 0:
+                    raise FailedTestError("jm did not complete properly: %s" % output.replace("\n", " "))
+
+                try:
+                    compare_yuv_output(*runres)("x264-output.yuv", "jm-output.yuv")
+                except ComparisonError, e:
+                    raise FailedTestError(e)
+            finally:
+                try: os.remove("x264-output.yuv")
+                except: pass
+
+                try: os.remove("%s.264" % self.fixture.dispatcher.video)
+                except: pass
+
+                try: os.remove("jm-output.yuv")
+                except: pass
+
+                try: os.remove("log.dec")
+                except: pass
+
+                try: os.remove("dataDec.txt")
+                except: pass
+
+        @comparer(compare_pass)
+        def test_ffmpeg(self):
+            if not program_exists("ffmpeg"): raise DisabledTestError("ffmpeg unavailable")
+            try:
+                runres = self._run_x264()
+
+                ffmpeg_proc = Popen([
+                    "ffmpeg",
+                    "-i",
+                    "%s.264" % self.fixture.dispatcher.video,
+                    "ffmpeg-output.yuv"
+                ], stdout=PIPE, stderr=STDOUT)
+
+                output = ffmpeg_proc.communicate()[0]
+                if ffmpeg_proc.returncode != 0:
+                    raise FailedTestError("ffmpeg did not complete properly: %s" % output.replace("\n", " "))
+
+                try:
+                    compare_yuv_output(*runres)("x264-output.yuv", "ffmpeg-output.yuv")
+                except ComparisonError, e:
+                    raise FailedTestError(e)
+            finally:
+                try: os.remove("x264-output.yuv")
+                except: pass
+
+                try: os.remove("%s.264" % self.fixture.dispatcher.video)
+                except: pass
+
+                try: os.remove("ffmpeg-output.yuv")
+                except: pass
+
+    return YUVOutputComparison
+
+class Regression(Case):
+    depends = [ Compile ]
+
+    _psnr_pattern = re.compile(r"x264 [[]info[]]: PSNR Mean Y:\d+[.]\d+ U:\d+[.]\d+ V:\d+[.]\d+ Avg:\d+[.]\d+ Global:(\d+[.]\d+) kb/s:\d+[.]\d+")
+    _ssim_pattern = re.compile(r"x264 [[]info[]]: SSIM Mean Y:(\d+[.]\d+) [(]\d+[.]\d+db[)]")
+
+    def __init__(self):
+        if self.fixture.dispatcher.x264:
+            self.__class__.__name__ += " %s" % " ".join(self.fixture.dispatcher.x264)
+
+    def test_psnr(self):
+        try:
+            x264_proc = Popen([
+                "./x264",
+                "-o",
+                "%s.264" % self.fixture.dispatcher.video,
+                "--psnr"
+            ] + self.fixture.dispatcher.x264 + [
+                self.fixture.dispatcher.video
+            ], stdout=PIPE, stderr=STDOUT)
+
+            output = x264_proc.communicate()[0]
+
+            if x264_proc.returncode != 0:
+                raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
+
+            for line in output.split("\n"):
+                if line.startswith("x264 [info]: PSNR Mean"):
+                    return float(self._psnr_pattern.match(line).group(1))
+
+            raise FailedTestError("no PSNR output caught from x264")
+        finally:
+            try: os.remove("%s.264" % self.fixture.dispatcher.video)
+            except: pass
+
+    def test_ssim(self):
+        try:
+            x264_proc = Popen([
+                "./x264",
+                "-o",
+                "%s.264" % self.fixture.dispatcher.video,
+                "--ssim"
+            ] + self.fixture.dispatcher.x264 + [
+                self.fixture.dispatcher.video
+            ], stdout=PIPE, stderr=STDOUT)
+
+            output = x264_proc.communicate()[0]
+
+            if x264_proc.returncode != 0:
+                raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
+
+            for line in output.split("\n"):
+                if line.startswith("x264 [info]: SSIM Mean"):
+                    return float(self._ssim_pattern.match(line).group(1))
+
+            raise FailedTestError("no PSNR output caught from x264")
+        finally:
+            try: os.remove("%s.264" % self.fixture.dispatcher.video)
+            except: pass
+
+def _generate_random_commandline():
+    commandline = []
+
+    for suboptions in OPTIONS:
+        commandline.append(suboptions[randrange(0, len(suboptions))])
+
+    return filter(None, reduce(operator.add, [ shlex.split(opt) for opt in commandline ]))
+
+_generated = []
+
+fixture = x264()
+fixture.register_case(Compile)
+
+fixture.register_case(Regression)
+
+class Dispatcher(_Dispatcher):
+    video = "akiyo_qcif.y4m"
+    products = 50
+    configure = []
+    x264 = []
+    yuv_tests = [ "jm" ]
+
+    def _populate_parser(self):
+        super(Dispatcher, self)._populate_parser()
+
+        # don't do a whole lot with this
+        tcase = _YUVOutputComparisonFactory()
+
+        yuv_tests = [ name[5:] for name, meth in filter(lambda pair: pair[0][:5] == "test_", inspect.getmembers(tcase)) ]
+
+        group = OptionGroup(self.optparse, "x264 testing-specific options")
+
+        group.add_option(
+            "-v",
+            "--video",
+            metavar="FILENAME",
+            action="callback",
+            dest="video",
+            type=str,
+            callback=lambda option, opt, value, parser: setattr(self, "video", value),
+            help="yuv video to perform testing on (default: %s)" % self.video
+        )
+
+        group.add_option(
+            "-s",
+            "--seed",
+            metavar="SEED",
+            action="callback",
+            dest="seed",
+            type=int,
+            callback=lambda option, opt, value, parser: setattr(self, "seed", value),
+            help="seed for the random number generator (default: unix timestamp)"
+        )
+
+        group.add_option(
+            "-p",
+            "--product-tests",
+            metavar="NUM",
+            action="callback",
+            dest="video",
+            type=int,
+            callback=lambda option, opt, value, parser: setattr(self, "products", value),
+            help="number of cartesian products to generate for yuv comparison testing (default: %d)" % self.products
+        )
+
+        group.add_option(
+            "--configure-with",
+            metavar="FLAGS",
+            action="callback",
+            dest="configure",
+            type=str,
+            callback=lambda option, opt, value, parser: setattr(self, "configure", shlex.split(value)),
+            help="options to run ./configure with"
+        )
+
+        group.add_option(
+            "--yuv-tests",
+            action="callback",
+            dest="yuv_tests",
+            type=str,
+            callback=lambda option, opt, value, parser: setattr(self, "yuv_tests", [
+                val.strip() for val in value.split(",")
+            ]),
+            help="select tests to run with yuv comparisons (default: %s, available: %s)" % (
+                ", ".join(self.yuv_tests),
+                ", ".join(yuv_tests)
+            )
+        )
+
+        group.add_option(
+            "--x264-with",
+            metavar="FLAGS",
+            action="callback",
+            dest="x264",
+            type=str,
+            callback=lambda option, opt, value, parser: setattr(self, "x264", shlex.split(value)),
+            help="additional options to run ./x264 with"
+        )
+
+        self.optparse.add_option_group(group)
+
+    def pre_dispatch(self):
+        if not hasattr(self, "seed"):
+            self.seed = int(time())
+
+        print "Using seed: %d" % self.seed
+        seed(self.seed)
+
+        for i in xrange(self.products):
+            YUVOutputComparison = _YUVOutputComparisonFactory()
+
+            commandline = _generate_random_commandline()
+
+            counter = 0
+
+            while commandline in _generated:
+                counter += 1
+                commandline = _generate_random_commandline()
+
+                if counter > 100:
+                    print >>sys.stderr, "Maximum command-line regeneration exceeded. "  \
+                                        "Try a different seed or specify fewer products to generate."
+                    sys.exit(1)
+
+            commandline += self.x264
+
+            _generated.append(commandline)
+
+            YUVOutputComparison.options = commandline
+            YUVOutputComparison.__name__ = ("%s %s" % (YUVOutputComparison.__name__, " ".join(commandline)))
+
+            fixture.register_case(YUVOutputComparison)
+
+Dispatcher(fixture).dispatch()