Patch from Google Code-In.
*.y4m
*.yuv
x264_2pass.log
+
+.digress_x264
+*.pyc
--- /dev/null
+"""
+Automated regression/unit testing suite.
+"""
+
+__version__ = '0.2'
+
+def digress(fixture):
+ """
+ Command-line helper for Digress.
+ """
+ from digress.cli import Dispatcher
+ Dispatcher(fixture).dispatch()
--- /dev/null
+"""
+Digress's CLI interface.
+"""
+
+import inspect
+import sys
+from optparse import OptionParser
+
+import textwrap
+
+from types import MethodType
+
+from digress import __version__ as version
+
+def dispatchable(func):
+ """
+ Mark a method as dispatchable.
+ """
+ func.digress_dispatchable = True
+ return func
+
+class Dispatcher(object):
+ """
+ Dispatcher for CLI commands.
+ """
+ def __init__(self, fixture):
+ self.fixture = fixture
+ fixture.dispatcher = self
+
+ def _monkey_print_help(self, optparse, *args, **kwargs):
+ # monkey patches OptionParser._print_help
+ OptionParser.print_help(optparse, *args, **kwargs)
+
+ print >>sys.stderr, "\nAvailable commands:"
+
+ maxlen = max([ len(command_name) for command_name in self.commands ])
+
+ descwidth = 80 - maxlen - 4
+
+ for command_name, command_meth in self.commands.iteritems():
+ print >>sys.stderr, " %s %s\n" % (
+ command_name.ljust(maxlen + 1),
+ ("\n" + (maxlen + 4) * " ").join(
+ textwrap.wrap(" ".join(filter(
+ None,
+ command_meth.__doc__.strip().replace("\n", " ").split(" ")
+ )),
+ descwidth
+ )
+ )
+ )
+
+ def _enable_flush(self):
+ self.fixture.flush_before = True
+
+ def _populate_parser(self):
+ self.commands = self._get_commands()
+
+ self.optparse = OptionParser(
+ usage = "usage: %prog [options] command [args]",
+ description = "Digress CLI frontend for %s." % self.fixture.__class__.__name__,
+ version = "Digress %s" % version
+ )
+
+ self.optparse.print_help = MethodType(self._monkey_print_help, self.optparse, OptionParser)
+
+ self.optparse.add_option(
+ "-f",
+ "--flush",
+ action="callback",
+ callback=lambda option, opt, value, parser: self._enable_flush(),
+ help="flush existing data for a revision before testing"
+ )
+
+ self.optparse.add_option(
+ "-c",
+ "--cases",
+ metavar="FOO,BAR",
+ action="callback",
+ dest="cases",
+ type=str,
+ callback=lambda option, opt, value, parser: self._select_cases(*value.split(",")),
+ help="test cases to run, run with command list to see full list"
+ )
+
+ def _select_cases(self, *cases):
+ self.fixture.cases = filter(lambda case: case.__name__ in cases, self.fixture.cases)
+
+ def _get_commands(self):
+ commands = {}
+
+ for name, member in inspect.getmembers(self.fixture):
+ if hasattr(member, "digress_dispatchable"):
+ commands[name] = member
+
+ return commands
+
+ def _run_command(self, name, *args):
+ if name not in self.commands:
+ print >>sys.stderr, "error: %s is not a valid command\n" % name
+ self.optparse.print_help()
+ return
+
+ command = self.commands[name]
+
+ argspec = inspect.getargspec(command)
+
+ max_arg_len = len(argspec.args) - 1
+ min_arg_len = max_arg_len - ((argspec.defaults is not None) and len(argspec.defaults) or 0)
+
+ if len(args) < min_arg_len:
+ print >>sys.stderr, "error: %s takes at least %d arguments\n" % (
+ name,
+ min_arg_len
+ )
+ print >>sys.stderr, "%s\n" % command.__doc__
+ self.optparse.print_help()
+ return
+
+ if len(args) > max_arg_len:
+ print >>sys.stderr, "error: %s takes at most %d arguments\n" % (
+ name,
+ max_arg_len
+ )
+ print >>sys.stderr, "%s\n" % command.__doc__
+ self.optparse.print_help()
+ return
+
+ command(*args)
+
+ def pre_dispatch(self):
+ pass
+
+ def dispatch(self):
+ self._populate_parser()
+
+ self.optparse.parse_args()
+ self.pre_dispatch()
+ args = self.optparse.parse_args()[1] # arguments may require reparsing after pre_dispatch; see test_x264.py
+
+ if len(args) == 0:
+ print >>sys.stderr, "error: no comamnd specified\n"
+ self.optparse.print_help()
+ return
+
+ command = args[0]
+ addenda = args[1:]
+
+ self._run_command(command, *addenda)
--- /dev/null
+"""
+Digress comparers.
+"""
+
+from digress.errors import ComparisonError
+
+import os
+from itertools import imap, izip
+
+def compare_direct(value_a, value_b):
+ if value_a != value_b:
+ raise ComparisonError("%s is not %s" % (value_a, value_b))
+
+def compare_pass(value_a, value_b):
+ """
+ Always true, as long as the test is passed.
+ """
+
+def compare_tolerance(tolerance):
+ def _compare_tolerance(value_a, value_b):
+ if abs(value_a - value_b) > tolerance:
+ raise ComparisonError("%s is not %s (tolerance: %s)" % (
+ value_a,
+ value_b,
+ tolerance
+ ))
+ return _compare_tolerance
+
+def compare_files(file_a, file_b):
+ size_a = os.path.getsize(file_a)
+ size_b = os.path.getsize(file_b)
+
+ print file_a, file_b
+
+ if size_a != size_b:
+ raise ComparisonError("%s is not the same size as %s" % (
+ file_a,
+ file_b
+ ))
+
+ BUFFER_SIZE = 8196
+
+ offset = 0
+
+ with open(file_a) as f_a:
+ with open(file_b) as f_b:
+ for chunk_a, chunk_b in izip(
+ imap(
+ lambda i: f_a.read(BUFFER_SIZE),
+ xrange(size_a // BUFFER_SIZE + 1)
+ ),
+ imap(
+ lambda i: f_b.read(BUFFER_SIZE),
+ xrange(size_b // BUFFER_SIZE + 1)
+ )
+ ):
+ chunk_size = len(chunk_a)
+
+ if chunk_a != chunk_b:
+ for i in xrange(chunk_size):
+ if chunk_a[i] != chunk_b[i]:
+ raise ComparisonError("%s differs from %s at offset %d" % (
+ file_a,
+ file_b,
+ offset + i
+ ))
+
+ offset += chunk_size
--- /dev/null
+"""
+All of Digress's constants.
+"""
+
+TEST_PASS = 0
+TEST_FAIL = 1
+TEST_DISABLED = 2
+TEST_SKIPPED = 3
+
+CASE_PASS = 0
+CASE_FAIL = 1
+
+FIXTURE_PASS = 0
+FIXTURE_FAIL = 1
--- /dev/null
+"""
+Digress errors.
+"""
+
+class DigressError(Exception):
+ """
+ Digress error base class.
+ """
+
+class NoSuchTestError(DigressError):
+ """
+ Raised when no such test exists.
+ """
+
+class DisabledTestError(DigressError):
+ """
+ Test is disabled.
+ """
+
+class SkippedTestError(DigressError):
+ """
+ Test is marked as skipped.
+ """
+
+class DisabledCaseError(DigressError):
+ """
+ Case is marked as disabled.
+ """
+
+class SkippedCaseError(DigressError):
+ """
+ Case is marked as skipped.
+ """
+
+class FailedTestError(DigressError):
+ """
+ Test failed.
+ """
+
+class ComparisonError(DigressError):
+ """
+ Comparison failed.
+ """
+
+class IncomparableError(DigressError):
+ """
+ Values cannot be compared.
+ """
+
+class AlreadyRunError(DigressError):
+ """
+ Test/case has already been run.
+ """
+
+class SCMError(DigressError):
+ """
+ Error occurred in SCM.
+ """
+ def __init__(self, message):
+ self.message = message.replace("\n", " ")
+
+ def __str__(self):
+ return self.message
--- /dev/null
+"""
+Source control backends for Digress.
+"""
--- /dev/null
+"""
+Dummy SCM backend for Digress.
+"""
+
+from random import random
+
+def checkout(revision):
+ """
+ Checkout a revision.
+ """
+ pass
+
+def current_rev():
+ """
+ Get the current revision
+ """
+ return str(random())
+
+def revisions(rev_a, rev_b):
+ """
+ Get a list of revisions from one to another.
+ """
+ pass
+
+def stash():
+ """
+ Stash the repository.
+ """
+ pass
+
+def unstash():
+ """
+ Unstash the repository.
+ """
+ pass
+
+def bisect(command, revision):
+ """
+ Perform a bisection.
+ """
+ raise NotImplementedError("dummy SCM backend does not support bisection")
--- /dev/null
+"""
+Git SCM backend for Digress.
+"""
+
+from subprocess import Popen, PIPE, STDOUT
+from digress.errors import SCMError
+
+def checkout(revision):
+ """
+ Checkout a revision from git.
+ """
+ proc = Popen([
+ "git",
+ "checkout",
+ "-f",
+ revision
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = proc.communicate()[0].strip()
+ if proc.returncode != 0:
+ raise SCMError("checkout error: %s" % output)
+
+def rev_parse(ref):
+ proc = Popen([
+ "git",
+ "rev-parse",
+ ref
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = proc.communicate()[0].strip()
+ if proc.returncode != 0:
+ raise SCMError("rev-parse error: %s" % output)
+ return output
+
+def current_rev():
+ """
+ Get the current revision.
+ """
+ return rev_parse("HEAD")
+
+def revisions(rev_a, rev_b):
+ """
+ Get a list of revisions from one to another.
+ """
+ proc = Popen([
+ "git",
+ "log",
+ "--format=%H", ("%s...%s" % (rev_a, rev_b))
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = proc.communicate()[0].strip()
+ if proc.returncode != 0:
+ raise SCMError("log error: %s" % output)
+ return output.split("\n")
+
+def stash():
+ """
+ Stash the repository.
+ """
+ proc = Popen([
+ "git",
+ "stash",
+ "save",
+ "--keep-index"
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = proc.communicate()[0].strip()
+ if proc.returncode != 0:
+ raise SCMError("stash error: %s" % output)
+
+def unstash():
+ """
+ Unstash the repository.
+ """
+ proc = Popen(["git", "stash", "pop"], stdout=PIPE, stderr=STDOUT)
+ proc.communicate()
+
+def bisect(*args):
+ """
+ Perform a bisection.
+ """
+ proc = Popen((["git", "bisect"] + list(args)), stdout=PIPE, stderr=STDOUT)
+ output = proc.communicate()[0]
+ if proc.returncode != 0:
+ raise SCMError("bisect error: %s" % output)
+ return output
+
+def dirty():
+ """
+ Check if the working tree is dirty.
+ """
+ proc = Popen(["git", "status"], stdout=PIPE, stderr=STDOUT)
+ output = proc.communicate()[0].strip()
+ if proc.returncode != 0:
+ raise SCMError("status error: %s" % output)
+ if "modified:" in output:
+ return True
+ else:
+ return False
--- /dev/null
+"""
+Digress testing core.
+"""
+
+from digress.errors import SkippedTestError, DisabledTestError, NoSuchTestError, \
+ FailedTestError, AlreadyRunError, SCMError, \
+ ComparisonError
+from digress.constants import *
+from digress.cli import dispatchable
+
+import inspect
+import operator
+import os
+import json
+
+import textwrap
+
+from shutil import rmtree
+
+from time import time
+from functools import wraps
+
+from itertools import izip_longest
+
+class depends(object):
+ """
+ Dependency decorator for a test.
+ """
+ def __init__(self, *test_names):
+ self.test_names = test_names
+
+ def __call__(self, func):
+ func.digress_depends = self.test_names
+ return func
+
+class _skipped(object):
+ """
+ Internal skipped decorator.
+ """
+ def __init__(self, reason=""):
+ self._reason = reason
+
+ def __call__(self, func):
+ @wraps(func)
+ def _closure(*args):
+ raise SkippedTestError(self._reason)
+ return _closure
+
+class disabled(object):
+ """
+ Disable a test, with reason.
+ """
+ def __init__(self, reason=""):
+ self._reason = reason
+
+ def __call__(self, func):
+ @wraps(func)
+ def _closure(*args):
+ raise DisabledTestError(self._reason)
+ return _closure
+
+class comparer(object):
+ """
+ Set the comparer for a test.
+ """
+ def __init__(self, comparer_):
+ self._comparer = comparer_
+
+ def __call__(self, func):
+ func.digress_comparer = self._comparer
+ return func
+
+class Fixture(object):
+ cases = []
+ scm = None
+
+ flush_before = False
+
+ def _skip_case(self, case, depend):
+ for name, meth in inspect.getmembers(case):
+ if name[:5] == "test_":
+ setattr(
+ case,
+ name,
+ _skipped("failed dependency: case %s" % depend)(meth)
+ )
+
+ def _run_case(self, case, results):
+ if case.__name__ in results:
+ raise AlreadyRunError
+
+ for depend in case.depends:
+ if depend.__name__ in results and results[depend.__name__]["status"] != CASE_PASS:
+ self._skip_case(case, depend.__name__)
+
+ try:
+ result = self._run_case(depend, results)
+ except AlreadyRunError:
+ continue
+
+ if result["status"] != CASE_PASS:
+ self._skip_case(case, depend.__name__)
+
+ result = case().run()
+ results[case.__name__] = result
+ return result
+
+ @dispatchable
+ def flush(self, revision=None):
+ """
+ Flush any cached results. Takes a revision for an optional argument.
+ """
+ if not revision:
+ print "Flushing all cached results...",
+
+ try:
+ rmtree(".digress_%s" % self.__class__.__name__)
+ except Exception, e:
+ print "failed: %s" % e
+ else:
+ print "done."
+ else:
+ try:
+ rev = self.scm.rev_parse(revision)
+ except SCMError, e:
+ print e
+ else:
+ print "Flushing cached results for %s..." % rev,
+
+ try:
+ rmtree(os.path.join(".digress_%s" % self.__class__.__name__, rev))
+ except Exception, e:
+ print "failed: %s" % e
+ else:
+ print "done."
+
+ @dispatchable
+ def run(self, revision=None):
+ """
+ Run the fixture for a specified revision.
+
+ Takes a revision for an argument.
+ """
+ oldrev = None
+ dirty = False
+
+ try:
+ dirty = self.scm.dirty()
+
+ # if the tree is clean, then we don't need to make an exception
+ if not dirty and revision is None: revision = "HEAD"
+
+ if revision:
+ oldrev = self.scm.current_rev()
+
+ if dirty:
+ self.scm.stash()
+ self.scm.checkout(revision)
+
+ rev = self.scm.current_rev()
+
+ self.datastore = os.path.join(".digress_%s" % self.__class__.__name__, rev)
+
+ if os.path.isdir(self.datastore):
+ if self.flush_before:
+ self.flush(rev)
+
+ os.makedirs(self.datastore)
+ else:
+ rev = "(dirty working tree)"
+ self.datastore = None
+
+ print "Running fixture %s on revision %s...\n" % (self.__class__.__name__, rev)
+
+ results = {}
+
+ for case in self.cases:
+ try:
+ self._run_case(case, results)
+ except AlreadyRunError:
+ continue
+
+ total_time = reduce(operator.add, filter(
+ None,
+ [
+ result["time"] for result in results.values()
+ ]
+ ), 0)
+
+ overall_status = (
+ CASE_FAIL in [ result["status"] for result in results.values() ]
+ ) and FIXTURE_FAIL or FIXTURE_PASS
+
+ print "Fixture %s in %.4f.\n" % (
+ (overall_status == FIXTURE_PASS) and "passed" or "failed",
+ total_time
+ )
+
+ return { "cases" : results, "time" : total_time, "status" : overall_status, "revision" : rev }
+
+ finally:
+ if oldrev:
+ self.scm.checkout(oldrev)
+ if dirty:
+ self.scm.unstash()
+
+ @dispatchable
+ def bisect(self, good_rev, bad_rev=None):
+ """
+ Perform a bisection between two revisions.
+
+ First argument is the good revision, second is the bad revision, which
+ defaults to the current revision.
+ """
+ if not bad_rev: bad_rev = self.scm.current_rev()
+
+ dirty = False
+
+ # get a set of results for the good revision
+ good_result = self.run(good_rev)
+
+ good_rev = good_result["revision"]
+
+ try:
+ dirty = self.scm.dirty()
+
+ if dirty:
+ self.scm.stash()
+
+ self.scm.bisect("start")
+
+ self.scm.bisect("bad", bad_rev)
+ self.scm.bisect("good", good_rev)
+
+ bisecting = True
+ isbad = False
+
+ while bisecting:
+ results = self.run(self.scm.current_rev())
+ revision = results["revision"]
+
+ # perform comparisons
+ # FIXME: this just uses a lot of self.compare
+ for case_name, case_result in good_result["cases"].iteritems():
+ case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
+
+ for test_name, test_result in case_result["tests"].iteritems():
+ test = filter(
+ lambda pair: pair[0] == "test_%s" % test_name,
+ inspect.getmembers(case)
+ )[0][1]
+
+ other_result = results["cases"][case_name]["tests"][test_name]
+
+ if other_result["status"] == TEST_FAIL and case_result["status"] != TEST_FAIL:
+ print "Revision %s failed %s.%s." % (revision, case_name, test_name)
+ isbad = True
+ break
+
+ elif hasattr(test, "digress_comparer"):
+ try:
+ test.digress_comparer(test_result["value"], other_result["value"])
+ except ComparisonError, e:
+ print "%s differs: %s" % (test_name, e)
+ isbad = True
+ break
+
+ if isbad:
+ output = self.scm.bisect("bad", revision)
+ print "Marking revision %s as bad." % revision
+ else:
+ output = self.scm.bisect("good", revision)
+ print "Marking revision %s as good." % revision
+
+ if output.split("\n")[0].endswith("is the first bad commit"):
+ print "\nBisection complete.\n"
+ print output
+ bisecting = False
+
+ print ""
+ except SCMError, e:
+ print e
+ finally:
+ self.scm.bisect("reset")
+
+ if dirty:
+ self.scm.unstash()
+
+ @dispatchable
+ def multicompare(self, rev_a=None, rev_b=None, mode="waterfall"):
+ """
+ Generate a comparison of tests.
+
+ Takes three optional arguments, from which revision, to which revision,
+ and the method of display (defaults to vertical "waterfall", also
+ accepts "river" for horizontal display)
+ """
+ if not rev_a: rev_a = self.scm.current_rev()
+ if not rev_b: rev_b = self.scm.current_rev()
+
+ revisions = self.scm.revisions(rev_a, rev_b)
+
+ results = []
+
+ for revision in revisions:
+ results.append(self.run(revision))
+
+ test_names = reduce(operator.add, [
+ [
+ (case_name, test_name)
+ for
+ test_name, test_result
+ in
+ case_result["tests"].iteritems()
+ ]
+ for
+ case_name, case_result
+ in
+ results[0]["cases"].iteritems()
+ ], [])
+
+ MAXLEN = 20
+
+ colfmt = "| %s "
+
+ table = []
+
+ if mode not in ("waterfall", "river"):
+ mode = "waterfall"
+
+ print "Unknown multicompare mode specified, defaulting to %s." % mode
+
+ if mode == "waterfall":
+ header = [ "Test" ]
+
+ for result in results:
+ header.append(result["revision"])
+
+ table.append(header)
+
+ for test_name in test_names:
+ row_data = [ ".".join(test_name) ]
+
+ for result in results:
+ test_result = result["cases"][test_name[0]]["tests"][test_name[1]]
+
+ if test_result["status"] != TEST_PASS:
+ value = "did not pass: %s" % (test_result["value"])
+ else:
+ value = "%s (%.4f)" % (test_result["value"], test_result["time"])
+
+ row_data.append(value)
+
+ table.append(row_data)
+
+ elif mode == "river":
+ header = [ "Revision" ]
+
+ for test_name in test_names:
+ header.append(".".join(test_name))
+
+ table.append(header)
+
+ for result in results:
+ row_data = [ result["revision"] ]
+
+ for case_name, case_result in result["cases"].iteritems():
+ for test_name, test_result in case_result["tests"].iteritems():
+
+ if test_result["status"] != TEST_PASS:
+ value = "did not pass: %s" % (test_result["value"])
+ else:
+ value = "%s (%.4f)" % (test_result["value"], test_result["time"])
+
+ row_data.append(value)
+
+ table.append(row_data)
+
+ breaker = "=" * (len(colfmt % "".center(MAXLEN)) * len(table[0]) + 1)
+
+ print breaker
+
+ for row in table:
+ for row_stuff in izip_longest(*[
+ textwrap.wrap(col, MAXLEN, break_on_hyphens=False) for col in row
+ ], fillvalue=""):
+ row_output = ""
+
+ for col in row_stuff:
+ row_output += colfmt % col.ljust(MAXLEN)
+
+ row_output += "|"
+
+ print row_output
+ print breaker
+
+ @dispatchable
+ def compare(self, rev_a, rev_b=None):
+ """
+ Compare two revisions directly.
+
+ Takes two arguments, second is optional and implies current revision.
+ """
+ results_a = self.run(rev_a)
+ results_b = self.run(rev_b)
+
+ for case_name, case_result in results_a["cases"].iteritems():
+ case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
+
+ header = "Comparison of case %s" % case_name
+ print header
+ print "=" * len(header)
+
+ for test_name, test_result in case_result["tests"].iteritems():
+ test = filter(
+ lambda pair: pair[0] == "test_%s" % test_name,
+ inspect.getmembers(case)
+ )[0][1]
+
+ other_result = results_b["cases"][case_name]["tests"][test_name]
+
+ if test_result["status"] != TEST_PASS or other_result["status"] != TEST_PASS:
+ print "%s cannot be compared as one of the revisions have not passed it." % test_name
+
+ elif hasattr(test, "digress_comparer"):
+ try:
+ test.digress_comparer(test_result["value"], other_result["value"])
+ except ComparisonError, e:
+ print "%s differs: %s" % (test_name, e)
+ else:
+ print "%s does not differ." % test_name
+ else:
+ print "%s has no comparer and therefore cannot be compared." % test_name
+
+ print ""
+
+ @dispatchable
+ def list(self):
+ """
+ List all available test cases, excluding dependencies.
+ """
+ print "\nAvailable Test Cases"
+ print "===================="
+ for case in self.cases:
+ print case.__name__
+
+ def register_case(self, case):
+ case.fixture = self
+ self.cases.append(case)
+
+class Case(object):
+ depends = []
+ fixture = None
+
+ def _get_test_by_name(self, test_name):
+ if not hasattr(self, "test_%s" % test_name):
+ raise NoSuchTestError(test_name)
+ return getattr(self, "test_%s" % test_name)
+
+ def _run_test(self, test, results):
+ test_name = test.__name__[5:]
+
+ if test_name in results:
+ raise AlreadyRunError
+
+ if hasattr(test, "digress_depends"):
+ for depend in test.digress_depends:
+ if depend in results and results[depend]["status"] != TEST_PASS:
+ test = _skipped("failed dependency: %s" % depend)(test)
+
+ dependtest = self._get_test_by_name(depend)
+
+ try:
+ result = self._run_test(dependtest, results)
+ except AlreadyRunError:
+ continue
+
+ if result["status"] != TEST_PASS:
+ test = _skipped("failed dependency: %s" % depend)(test)
+
+ start_time = time()
+ run_time = None
+
+ print "Running test %s..." % test_name,
+
+ try:
+ if not self.datastore:
+ # XXX: this smells funny
+ raise IOError
+
+ with open(os.path.join(self.datastore, "%s.json" % test_name), "r") as f:
+ result = json.load(f)
+
+ value = str(result["value"])
+
+ if result["status"] == TEST_DISABLED:
+ status = "disabled"
+ elif result["status"] == TEST_SKIPPED:
+ status = "skipped"
+ elif result["status"] == TEST_FAIL:
+ status = "failed"
+ elif result["status"] == TEST_PASS:
+ status = "passed"
+ value = "%s (in %.4f)" % (
+ result["value"] or "(no result)",
+ result["time"]
+ )
+ else:
+ status = "???"
+
+ print "%s (cached): %s" % (status, value)
+ except IOError:
+ try:
+ value = test()
+ except DisabledTestError, e:
+ print "disabled: %s" % e
+ status = TEST_DISABLED
+ value = str(e)
+ except SkippedTestError, e:
+ print "skipped: %s" % e
+ status = TEST_SKIPPED
+ value = str(e)
+ except FailedTestError, e:
+ print "failed: %s" % e
+ status = TEST_FAIL
+ value = str(e)
+ except Exception, e:
+ print "failed with exception: %s" % e
+ status = TEST_FAIL
+ value = str(e)
+ else:
+ run_time = time() - start_time
+ print "passed: %s (in %.4f)" % (
+ value or "(no result)",
+ run_time
+ )
+ status = TEST_PASS
+
+ result = { "status" : status, "value" : value, "time" : run_time }
+
+ if self.datastore:
+ with open(os.path.join(self.datastore, "%s.json" % test_name), "w") as f:
+ json.dump(result, f)
+
+ results[test_name] = result
+ return result
+
+ def run(self):
+ print "Running case %s..." % self.__class__.__name__
+
+ if self.fixture.datastore:
+ self.datastore = os.path.join(self.fixture.datastore, self.__class__.__name__)
+ if not os.path.isdir(self.datastore):
+ os.makedirs(self.datastore)
+ else:
+ self.datastore = None
+
+ results = {}
+
+ for name, meth in inspect.getmembers(self):
+ if name[:5] == "test_":
+ try:
+ self._run_test(meth, results)
+ except AlreadyRunError:
+ continue
+
+ total_time = reduce(operator.add, filter(
+ None, [
+ result["time"] for result in results.values()
+ ]
+ ), 0)
+
+ overall_status = (
+ TEST_FAIL in [ result["status"] for result in results.values() ]
+ ) and CASE_FAIL or CASE_PASS
+
+ print "Case %s in %.4f.\n" % (
+ (overall_status == FIXTURE_PASS) and "passed" or "failed",
+ total_time
+ )
+
+ return { "tests" : results, "time" : total_time, "status" : overall_status }
--- /dev/null
+#!/usr/bin/env python
+
+import operator
+
+from optparse import OptionGroup
+
+import sys
+
+from time import time
+
+from digress.cli import Dispatcher as _Dispatcher
+from digress.errors import ComparisonError, FailedTestError, DisabledTestError
+from digress.testing import depends, comparer, Fixture, Case
+from digress.comparers import compare_pass
+from digress.scm import git as x264git
+
+from subprocess import Popen, PIPE, STDOUT
+
+import os
+import re
+import shlex
+import inspect
+
+from random import randrange, seed
+from math import ceil
+
+from itertools import imap, izip
+
+os.chdir(os.path.join(os.path.dirname(__file__), ".."))
+
+# options
+
+OPTIONS = [
+ [ "--tune %s" % t for t in ("film", "zerolatency") ],
+ ("", "--intra-refresh"),
+ ("", "--no-cabac"),
+ [ "--preset %s" % p for p in ("ultrafast",
+ "superfast",
+ "veryfast",
+ "faster",
+ "fast",
+ "medium",
+ "slow",
+ "slower") ]
+]
+
+# end options
+
+def compare_yuv_output(width, height):
+ def _compare_yuv_output(file_a, file_b):
+ size_a = os.path.getsize(file_a)
+ size_b = os.path.getsize(file_b)
+
+ if size_a != size_b:
+ raise ComparisonError("%s is not the same size as %s" % (
+ file_a,
+ file_b
+ ))
+
+ BUFFER_SIZE = 8196
+
+ offset = 0
+
+ with open(file_a) as f_a:
+ with open(file_b) as f_b:
+ for chunk_a, chunk_b in izip(
+ imap(
+ lambda i: f_a.read(BUFFER_SIZE),
+ xrange(size_a // BUFFER_SIZE + 1)
+ ),
+ imap(
+ lambda i: f_b.read(BUFFER_SIZE),
+ xrange(size_b // BUFFER_SIZE + 1)
+ )
+ ):
+ chunk_size = len(chunk_a)
+
+ if chunk_a != chunk_b:
+ for i in xrange(chunk_size):
+ if chunk_a[i] != chunk_b[i]:
+ # calculate the macroblock, plane and frame from the offset
+ offs = offset + i
+
+ y_plane_area = width * height
+ u_plane_area = y_plane_area + y_plane_area * 0.25
+ v_plane_area = u_plane_area + y_plane_area * 0.25
+
+ pixel = offs % v_plane_area
+ frame = offs // v_plane_area
+
+ if pixel < y_plane_area:
+ plane = "Y"
+
+ pixel_x = pixel % width
+ pixel_y = pixel // width
+
+ macroblock = (ceil(pixel_x / 16.0), ceil(pixel_y / 16.0))
+ elif pixel < u_plane_area:
+ plane = "U"
+
+ pixel -= y_plane_area
+
+ pixel_x = pixel % width
+ pixel_y = pixel // width
+
+ macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
+ else:
+ plane = "V"
+
+ pixel -= u_plane_area
+
+ pixel_x = pixel % width
+ pixel_y = pixel // width
+
+ macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
+
+ macroblock = tuple([ int(x) for x in macroblock ])
+
+ raise ComparisonError("%s differs from %s at frame %d, " \
+ "macroblock %s on the %s plane (offset %d)" % (
+ file_a,
+ file_b,
+ frame,
+ macroblock,
+ plane,
+ offs)
+ )
+
+ offset += chunk_size
+
+ return _compare_yuv_output
+
+def program_exists(program):
+ def is_exe(fpath):
+ return os.path.exists(fpath) and os.access(fpath, os.X_OK)
+
+ fpath, fname = os.path.split(program)
+
+ if fpath:
+ if is_exe(program):
+ return program
+ else:
+ for path in os.environ["PATH"].split(os.pathsep):
+ exe_file = os.path.join(path, program)
+ if is_exe(exe_file):
+ return exe_file
+
+ return None
+
+class x264(Fixture):
+ scm = x264git
+
+class Compile(Case):
+ @comparer(compare_pass)
+ def test_configure(self):
+ Popen([
+ "make",
+ "distclean"
+ ], stdout=PIPE, stderr=STDOUT).communicate()
+
+ configure_proc = Popen([
+ "./configure"
+ ] + self.fixture.dispatcher.configure, stdout=PIPE, stderr=STDOUT)
+
+ output = configure_proc.communicate()[0]
+ if configure_proc.returncode != 0:
+ raise FailedTestError("configure failed: %s" % output.replace("\n", " "))
+
+ @depends("configure")
+ @comparer(compare_pass)
+ def test_make(self):
+ make_proc = Popen([
+ "make",
+ "-j5"
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = make_proc.communicate()[0]
+ if make_proc.returncode != 0:
+ raise FailedTestError("make failed: %s" % output.replace("\n", " "))
+
+_dimension_pattern = re.compile(r"\w+ [[]info[]]: (\d+)x(\d+)[pi] \d+:\d+ @ \d+/\d+ fps [(][vc]fr[)]")
+
+def _YUVOutputComparisonFactory():
+ class YUVOutputComparison(Case):
+ _dimension_pattern = _dimension_pattern
+
+ depends = [ Compile ]
+ options = []
+
+ def __init__(self):
+ for name, meth in inspect.getmembers(self):
+ if name[:5] == "test_" and name[5:] not in self.fixture.dispatcher.yuv_tests:
+ delattr(self.__class__, name)
+
+ def _run_x264(self):
+ x264_proc = Popen([
+ "./x264",
+ "-o",
+ "%s.264" % self.fixture.dispatcher.video,
+ "--dump-yuv",
+ "x264-output.yuv"
+ ] + self.options + [
+ self.fixture.dispatcher.video
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = x264_proc.communicate()[0]
+ if x264_proc.returncode != 0:
+ raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
+
+ matches = _dimension_pattern.match(output)
+
+ return (int(matches.group(1)), int(matches.group(2)))
+
+ @comparer(compare_pass)
+ def test_jm(self):
+ if not program_exists("ldecod"): raise DisabledTestError("jm unavailable")
+
+ try:
+ runres = self._run_x264()
+
+ jm_proc = Popen([
+ "ldecod",
+ "-i",
+ "%s.264" % self.fixture.dispatcher.video,
+ "-o",
+ "jm-output.yuv"
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = jm_proc.communicate()[0]
+ if jm_proc.returncode != 0:
+ raise FailedTestError("jm did not complete properly: %s" % output.replace("\n", " "))
+
+ try:
+ compare_yuv_output(*runres)("x264-output.yuv", "jm-output.yuv")
+ except ComparisonError, e:
+ raise FailedTestError(e)
+ finally:
+ try: os.remove("x264-output.yuv")
+ except: pass
+
+ try: os.remove("%s.264" % self.fixture.dispatcher.video)
+ except: pass
+
+ try: os.remove("jm-output.yuv")
+ except: pass
+
+ try: os.remove("log.dec")
+ except: pass
+
+ try: os.remove("dataDec.txt")
+ except: pass
+
+ @comparer(compare_pass)
+ def test_ffmpeg(self):
+ if not program_exists("ffmpeg"): raise DisabledTestError("ffmpeg unavailable")
+ try:
+ runres = self._run_x264()
+
+ ffmpeg_proc = Popen([
+ "ffmpeg",
+ "-i",
+ "%s.264" % self.fixture.dispatcher.video,
+ "ffmpeg-output.yuv"
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = ffmpeg_proc.communicate()[0]
+ if ffmpeg_proc.returncode != 0:
+ raise FailedTestError("ffmpeg did not complete properly: %s" % output.replace("\n", " "))
+
+ try:
+ compare_yuv_output(*runres)("x264-output.yuv", "ffmpeg-output.yuv")
+ except ComparisonError, e:
+ raise FailedTestError(e)
+ finally:
+ try: os.remove("x264-output.yuv")
+ except: pass
+
+ try: os.remove("%s.264" % self.fixture.dispatcher.video)
+ except: pass
+
+ try: os.remove("ffmpeg-output.yuv")
+ except: pass
+
+ return YUVOutputComparison
+
+class Regression(Case):
+ depends = [ Compile ]
+
+ _psnr_pattern = re.compile(r"x264 [[]info[]]: PSNR Mean Y:\d+[.]\d+ U:\d+[.]\d+ V:\d+[.]\d+ Avg:\d+[.]\d+ Global:(\d+[.]\d+) kb/s:\d+[.]\d+")
+ _ssim_pattern = re.compile(r"x264 [[]info[]]: SSIM Mean Y:(\d+[.]\d+) [(]\d+[.]\d+db[)]")
+
+ def __init__(self):
+ if self.fixture.dispatcher.x264:
+ self.__class__.__name__ += " %s" % " ".join(self.fixture.dispatcher.x264)
+
+ def test_psnr(self):
+ try:
+ x264_proc = Popen([
+ "./x264",
+ "-o",
+ "%s.264" % self.fixture.dispatcher.video,
+ "--psnr"
+ ] + self.fixture.dispatcher.x264 + [
+ self.fixture.dispatcher.video
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = x264_proc.communicate()[0]
+
+ if x264_proc.returncode != 0:
+ raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
+
+ for line in output.split("\n"):
+ if line.startswith("x264 [info]: PSNR Mean"):
+ return float(self._psnr_pattern.match(line).group(1))
+
+ raise FailedTestError("no PSNR output caught from x264")
+ finally:
+ try: os.remove("%s.264" % self.fixture.dispatcher.video)
+ except: pass
+
+ def test_ssim(self):
+ try:
+ x264_proc = Popen([
+ "./x264",
+ "-o",
+ "%s.264" % self.fixture.dispatcher.video,
+ "--ssim"
+ ] + self.fixture.dispatcher.x264 + [
+ self.fixture.dispatcher.video
+ ], stdout=PIPE, stderr=STDOUT)
+
+ output = x264_proc.communicate()[0]
+
+ if x264_proc.returncode != 0:
+ raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
+
+ for line in output.split("\n"):
+ if line.startswith("x264 [info]: SSIM Mean"):
+ return float(self._ssim_pattern.match(line).group(1))
+
+ raise FailedTestError("no PSNR output caught from x264")
+ finally:
+ try: os.remove("%s.264" % self.fixture.dispatcher.video)
+ except: pass
+
+def _generate_random_commandline():
+ commandline = []
+
+ for suboptions in OPTIONS:
+ commandline.append(suboptions[randrange(0, len(suboptions))])
+
+ return filter(None, reduce(operator.add, [ shlex.split(opt) for opt in commandline ]))
+
+_generated = []
+
+fixture = x264()
+fixture.register_case(Compile)
+
+fixture.register_case(Regression)
+
+class Dispatcher(_Dispatcher):
+ video = "akiyo_qcif.y4m"
+ products = 50
+ configure = []
+ x264 = []
+ yuv_tests = [ "jm" ]
+
+ def _populate_parser(self):
+ super(Dispatcher, self)._populate_parser()
+
+ # don't do a whole lot with this
+ tcase = _YUVOutputComparisonFactory()
+
+ yuv_tests = [ name[5:] for name, meth in filter(lambda pair: pair[0][:5] == "test_", inspect.getmembers(tcase)) ]
+
+ group = OptionGroup(self.optparse, "x264 testing-specific options")
+
+ group.add_option(
+ "-v",
+ "--video",
+ metavar="FILENAME",
+ action="callback",
+ dest="video",
+ type=str,
+ callback=lambda option, opt, value, parser: setattr(self, "video", value),
+ help="yuv video to perform testing on (default: %s)" % self.video
+ )
+
+ group.add_option(
+ "-s",
+ "--seed",
+ metavar="SEED",
+ action="callback",
+ dest="seed",
+ type=int,
+ callback=lambda option, opt, value, parser: setattr(self, "seed", value),
+ help="seed for the random number generator (default: unix timestamp)"
+ )
+
+ group.add_option(
+ "-p",
+ "--product-tests",
+ metavar="NUM",
+ action="callback",
+ dest="video",
+ type=int,
+ callback=lambda option, opt, value, parser: setattr(self, "products", value),
+ help="number of cartesian products to generate for yuv comparison testing (default: %d)" % self.products
+ )
+
+ group.add_option(
+ "--configure-with",
+ metavar="FLAGS",
+ action="callback",
+ dest="configure",
+ type=str,
+ callback=lambda option, opt, value, parser: setattr(self, "configure", shlex.split(value)),
+ help="options to run ./configure with"
+ )
+
+ group.add_option(
+ "--yuv-tests",
+ action="callback",
+ dest="yuv_tests",
+ type=str,
+ callback=lambda option, opt, value, parser: setattr(self, "yuv_tests", [
+ val.strip() for val in value.split(",")
+ ]),
+ help="select tests to run with yuv comparisons (default: %s, available: %s)" % (
+ ", ".join(self.yuv_tests),
+ ", ".join(yuv_tests)
+ )
+ )
+
+ group.add_option(
+ "--x264-with",
+ metavar="FLAGS",
+ action="callback",
+ dest="x264",
+ type=str,
+ callback=lambda option, opt, value, parser: setattr(self, "x264", shlex.split(value)),
+ help="additional options to run ./x264 with"
+ )
+
+ self.optparse.add_option_group(group)
+
+ def pre_dispatch(self):
+ if not hasattr(self, "seed"):
+ self.seed = int(time())
+
+ print "Using seed: %d" % self.seed
+ seed(self.seed)
+
+ for i in xrange(self.products):
+ YUVOutputComparison = _YUVOutputComparisonFactory()
+
+ commandline = _generate_random_commandline()
+
+ counter = 0
+
+ while commandline in _generated:
+ counter += 1
+ commandline = _generate_random_commandline()
+
+ if counter > 100:
+ print >>sys.stderr, "Maximum command-line regeneration exceeded. " \
+ "Try a different seed or specify fewer products to generate."
+ sys.exit(1)
+
+ commandline += self.x264
+
+ _generated.append(commandline)
+
+ YUVOutputComparison.options = commandline
+ YUVOutputComparison.__name__ = ("%s %s" % (YUVOutputComparison.__name__, " ".join(commandline)))
+
+ fixture.register_case(YUVOutputComparison)
+
+Dispatcher(fixture).dispatch()