5 from optparse import OptionGroup
11 from digress.cli import Dispatcher as _Dispatcher
12 from digress.errors import ComparisonError, FailedTestError, DisabledTestError
13 from digress.testing import depends, comparer, Fixture, Case
14 from digress.comparers import compare_pass
15 from digress.scm import git as x264git
17 from subprocess import Popen, PIPE, STDOUT
24 from random import randrange, seed
27 from itertools import imap, izip
29 os.chdir(os.path.join(os.path.dirname(__file__), ".."))
34 [ "--tune %s" % t for t in ("film", "zerolatency") ],
35 ("", "--intra-refresh"),
38 ("", "--slice-max-size 1000"),
39 ("", "--frame-packing 5"),
40 [ "--preset %s" % p for p in ("ultrafast",
52 def compare_yuv_output(width, height):
53 def _compare_yuv_output(file_a, file_b):
54 size_a = os.path.getsize(file_a)
55 size_b = os.path.getsize(file_b)
58 raise ComparisonError("%s is not the same size as %s" % (
67 with open(file_a) as f_a:
68 with open(file_b) as f_b:
69 for chunk_a, chunk_b in izip(
71 lambda i: f_a.read(BUFFER_SIZE),
72 xrange(size_a // BUFFER_SIZE + 1)
75 lambda i: f_b.read(BUFFER_SIZE),
76 xrange(size_b // BUFFER_SIZE + 1)
79 chunk_size = len(chunk_a)
81 if chunk_a != chunk_b:
82 for i in xrange(chunk_size):
83 if chunk_a[i] != chunk_b[i]:
84 # calculate the macroblock, plane and frame from the offset
87 y_plane_area = width * height
88 u_plane_area = y_plane_area + y_plane_area * 0.25
89 v_plane_area = u_plane_area + y_plane_area * 0.25
91 pixel = offs % v_plane_area
92 frame = offs // v_plane_area
94 if pixel < y_plane_area:
97 pixel_x = pixel % width
98 pixel_y = pixel // width
100 macroblock = (ceil(pixel_x / 16.0), ceil(pixel_y / 16.0))
101 elif pixel < u_plane_area:
104 pixel -= y_plane_area
106 pixel_x = pixel % width
107 pixel_y = pixel // width
109 macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
113 pixel -= u_plane_area
115 pixel_x = pixel % width
116 pixel_y = pixel // width
118 macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
120 macroblock = tuple([ int(x) for x in macroblock ])
122 raise ComparisonError("%s differs from %s at frame %d, " \
123 "macroblock %s on the %s plane (offset %d)" % (
134 return _compare_yuv_output
136 def program_exists(program):
138 return os.path.exists(fpath) and os.access(fpath, os.X_OK)
140 fpath, fname = os.path.split(program)
146 for path in os.environ["PATH"].split(os.pathsep):
147 exe_file = os.path.join(path, program)
157 @comparer(compare_pass)
158 def test_configure(self):
162 ], stdout=PIPE, stderr=STDOUT).communicate()
164 configure_proc = Popen([
166 ] + self.fixture.dispatcher.configure, stdout=PIPE, stderr=STDOUT)
168 output = configure_proc.communicate()[0]
169 if configure_proc.returncode != 0:
170 raise FailedTestError("configure failed: %s" % output.replace("\n", " "))
172 @depends("configure")
173 @comparer(compare_pass)
178 ], stdout=PIPE, stderr=STDOUT)
180 output = make_proc.communicate()[0]
181 if make_proc.returncode != 0:
182 raise FailedTestError("make failed: %s" % output.replace("\n", " "))
184 _dimension_pattern = re.compile(r"\w+ [[]info[]]: (\d+)x(\d+)[pi] \d+:\d+ @ \d+/\d+ fps [(][vc]fr[)]")
186 def _YUVOutputComparisonFactory():
187 class YUVOutputComparison(Case):
188 _dimension_pattern = _dimension_pattern
190 depends = [ Compile ]
194 for name, meth in inspect.getmembers(self):
195 if name[:5] == "test_" and name[5:] not in self.fixture.dispatcher.yuv_tests:
196 delattr(self.__class__, name)
202 "%s.264" % self.fixture.dispatcher.video,
206 self.fixture.dispatcher.video
207 ], stdout=PIPE, stderr=STDOUT)
209 output = x264_proc.communicate()[0]
210 if x264_proc.returncode != 0:
211 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
213 matches = _dimension_pattern.match(output)
215 return (int(matches.group(1)), int(matches.group(2)))
217 @comparer(compare_pass)
219 if not program_exists("ldecod"): raise DisabledTestError("jm unavailable")
222 runres = self._run_x264()
227 "%s.264" % self.fixture.dispatcher.video,
230 ], stdout=PIPE, stderr=STDOUT)
232 output = jm_proc.communicate()[0]
233 if jm_proc.returncode != 0:
234 raise FailedTestError("jm did not complete properly: %s" % output.replace("\n", " "))
237 compare_yuv_output(*runres)("x264-output.yuv", "jm-output.yuv")
238 except ComparisonError, e:
239 raise FailedTestError(e)
241 try: os.remove("x264-output.yuv")
244 try: os.remove("%s.264" % self.fixture.dispatcher.video)
247 try: os.remove("jm-output.yuv")
250 try: os.remove("log.dec")
253 try: os.remove("dataDec.txt")
256 @comparer(compare_pass)
257 def test_ffmpeg(self):
258 if not program_exists("ffmpeg"): raise DisabledTestError("ffmpeg unavailable")
260 runres = self._run_x264()
262 ffmpeg_proc = Popen([
266 "%s.264" % self.fixture.dispatcher.video,
268 ], stdout=PIPE, stderr=STDOUT)
270 output = ffmpeg_proc.communicate()[0]
271 if ffmpeg_proc.returncode != 0:
272 raise FailedTestError("ffmpeg did not complete properly: %s" % output.replace("\n", " "))
275 compare_yuv_output(*runres)("x264-output.yuv", "ffmpeg-output.yuv")
276 except ComparisonError, e:
277 raise FailedTestError(e)
279 try: os.remove("x264-output.yuv")
282 try: os.remove("%s.264" % self.fixture.dispatcher.video)
285 try: os.remove("ffmpeg-output.yuv")
288 return YUVOutputComparison
290 class Regression(Case):
291 depends = [ Compile ]
293 _psnr_pattern = re.compile(r"x264 [[]info[]]: PSNR Mean Y:\d+[.]\d+ U:\d+[.]\d+ V:\d+[.]\d+ Avg:\d+[.]\d+ Global:(\d+[.]\d+) kb/s:\d+[.]\d+")
294 _ssim_pattern = re.compile(r"x264 [[]info[]]: SSIM Mean Y:(\d+[.]\d+) [(]\d+[.]\d+db[)]")
297 if self.fixture.dispatcher.x264:
298 self.__class__.__name__ += " %s" % " ".join(self.fixture.dispatcher.x264)
305 "%s.264" % self.fixture.dispatcher.video,
307 ] + self.fixture.dispatcher.x264 + [
308 self.fixture.dispatcher.video
309 ], stdout=PIPE, stderr=STDOUT)
311 output = x264_proc.communicate()[0]
313 if x264_proc.returncode != 0:
314 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
316 for line in output.split("\n"):
317 if line.startswith("x264 [info]: PSNR Mean"):
318 return float(self._psnr_pattern.match(line).group(1))
320 raise FailedTestError("no PSNR output caught from x264")
322 try: os.remove("%s.264" % self.fixture.dispatcher.video)
330 "%s.264" % self.fixture.dispatcher.video,
332 ] + self.fixture.dispatcher.x264 + [
333 self.fixture.dispatcher.video
334 ], stdout=PIPE, stderr=STDOUT)
336 output = x264_proc.communicate()[0]
338 if x264_proc.returncode != 0:
339 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
341 for line in output.split("\n"):
342 if line.startswith("x264 [info]: SSIM Mean"):
343 return float(self._ssim_pattern.match(line).group(1))
345 raise FailedTestError("no PSNR output caught from x264")
347 try: os.remove("%s.264" % self.fixture.dispatcher.video)
350 def _generate_random_commandline():
353 for suboptions in OPTIONS:
354 commandline.append(suboptions[randrange(0, len(suboptions))])
356 return filter(None, reduce(operator.add, [ shlex.split(opt) for opt in commandline ]))
361 fixture.register_case(Compile)
363 fixture.register_case(Regression)
365 class Dispatcher(_Dispatcher):
366 video = "akiyo_qcif.y4m"
372 def _populate_parser(self):
373 super(Dispatcher, self)._populate_parser()
375 # don't do a whole lot with this
376 tcase = _YUVOutputComparisonFactory()
378 yuv_tests = [ name[5:] for name, meth in filter(lambda pair: pair[0][:5] == "test_", inspect.getmembers(tcase)) ]
380 group = OptionGroup(self.optparse, "x264 testing-specific options")
389 callback=lambda option, opt, value, parser: setattr(self, "video", value),
390 help="yuv video to perform testing on (default: %s)" % self.video
400 callback=lambda option, opt, value, parser: setattr(self, "seed", value),
401 help="seed for the random number generator (default: unix timestamp)"
411 callback=lambda option, opt, value, parser: setattr(self, "products", value),
412 help="number of cartesian products to generate for yuv comparison testing (default: %d)" % self.products
421 callback=lambda option, opt, value, parser: setattr(self, "configure", shlex.split(value)),
422 help="options to run ./configure with"
430 callback=lambda option, opt, value, parser: setattr(self, "yuv_tests", [
431 val.strip() for val in value.split(",")
433 help="select tests to run with yuv comparisons (default: %s, available: %s)" % (
434 ", ".join(self.yuv_tests),
445 callback=lambda option, opt, value, parser: setattr(self, "x264", shlex.split(value)),
446 help="additional options to run ./x264 with"
449 self.optparse.add_option_group(group)
451 def pre_dispatch(self):
452 if not hasattr(self, "seed"):
453 self.seed = int(time())
455 print "Using seed: %d" % self.seed
458 for i in xrange(self.products):
459 YUVOutputComparison = _YUVOutputComparisonFactory()
461 commandline = _generate_random_commandline()
465 while commandline in _generated:
467 commandline = _generate_random_commandline()
470 print >>sys.stderr, "Maximum command-line regeneration exceeded. " \
471 "Try a different seed or specify fewer products to generate."
474 commandline += self.x264
476 _generated.append(commandline)
478 YUVOutputComparison.options = commandline
479 YUVOutputComparison.__name__ = ("%s %s" % (YUVOutputComparison.__name__, " ".join(commandline)))
481 fixture.register_case(YUVOutputComparison)
483 Dispatcher(fixture).dispatch()