5 from optparse import OptionGroup
11 from digress.cli import Dispatcher as _Dispatcher
12 from digress.errors import ComparisonError, FailedTestError, DisabledTestError
13 from digress.testing import depends, comparer, Fixture, Case
14 from digress.comparers import compare_pass
15 from digress.scm import git as x264git
17 from subprocess import Popen, PIPE, STDOUT
24 from random import randrange, seed
27 from itertools import imap, izip
29 os.chdir(os.path.join(os.path.dirname(__file__), ".."))
34 [ "--tune %s" % t for t in ("film", "zerolatency") ],
35 ("", "--intra-refresh"),
37 [ "--preset %s" % p for p in ("ultrafast",
49 def compare_yuv_output(width, height):
50 def _compare_yuv_output(file_a, file_b):
51 size_a = os.path.getsize(file_a)
52 size_b = os.path.getsize(file_b)
55 raise ComparisonError("%s is not the same size as %s" % (
64 with open(file_a) as f_a:
65 with open(file_b) as f_b:
66 for chunk_a, chunk_b in izip(
68 lambda i: f_a.read(BUFFER_SIZE),
69 xrange(size_a // BUFFER_SIZE + 1)
72 lambda i: f_b.read(BUFFER_SIZE),
73 xrange(size_b // BUFFER_SIZE + 1)
76 chunk_size = len(chunk_a)
78 if chunk_a != chunk_b:
79 for i in xrange(chunk_size):
80 if chunk_a[i] != chunk_b[i]:
81 # calculate the macroblock, plane and frame from the offset
84 y_plane_area = width * height
85 u_plane_area = y_plane_area + y_plane_area * 0.25
86 v_plane_area = u_plane_area + y_plane_area * 0.25
88 pixel = offs % v_plane_area
89 frame = offs // v_plane_area
91 if pixel < y_plane_area:
94 pixel_x = pixel % width
95 pixel_y = pixel // width
97 macroblock = (ceil(pixel_x / 16.0), ceil(pixel_y / 16.0))
98 elif pixel < u_plane_area:
101 pixel -= y_plane_area
103 pixel_x = pixel % width
104 pixel_y = pixel // width
106 macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
110 pixel -= u_plane_area
112 pixel_x = pixel % width
113 pixel_y = pixel // width
115 macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
117 macroblock = tuple([ int(x) for x in macroblock ])
119 raise ComparisonError("%s differs from %s at frame %d, " \
120 "macroblock %s on the %s plane (offset %d)" % (
131 return _compare_yuv_output
133 def program_exists(program):
135 return os.path.exists(fpath) and os.access(fpath, os.X_OK)
137 fpath, fname = os.path.split(program)
143 for path in os.environ["PATH"].split(os.pathsep):
144 exe_file = os.path.join(path, program)
154 @comparer(compare_pass)
155 def test_configure(self):
159 ], stdout=PIPE, stderr=STDOUT).communicate()
161 configure_proc = Popen([
163 ] + self.fixture.dispatcher.configure, stdout=PIPE, stderr=STDOUT)
165 output = configure_proc.communicate()[0]
166 if configure_proc.returncode != 0:
167 raise FailedTestError("configure failed: %s" % output.replace("\n", " "))
169 @depends("configure")
170 @comparer(compare_pass)
175 ], stdout=PIPE, stderr=STDOUT)
177 output = make_proc.communicate()[0]
178 if make_proc.returncode != 0:
179 raise FailedTestError("make failed: %s" % output.replace("\n", " "))
181 _dimension_pattern = re.compile(r"\w+ [[]info[]]: (\d+)x(\d+)[pi] \d+:\d+ @ \d+/\d+ fps [(][vc]fr[)]")
183 def _YUVOutputComparisonFactory():
184 class YUVOutputComparison(Case):
185 _dimension_pattern = _dimension_pattern
187 depends = [ Compile ]
191 for name, meth in inspect.getmembers(self):
192 if name[:5] == "test_" and name[5:] not in self.fixture.dispatcher.yuv_tests:
193 delattr(self.__class__, name)
199 "%s.264" % self.fixture.dispatcher.video,
203 self.fixture.dispatcher.video
204 ], stdout=PIPE, stderr=STDOUT)
206 output = x264_proc.communicate()[0]
207 if x264_proc.returncode != 0:
208 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
210 matches = _dimension_pattern.match(output)
212 return (int(matches.group(1)), int(matches.group(2)))
214 @comparer(compare_pass)
216 if not program_exists("ldecod"): raise DisabledTestError("jm unavailable")
219 runres = self._run_x264()
224 "%s.264" % self.fixture.dispatcher.video,
227 ], stdout=PIPE, stderr=STDOUT)
229 output = jm_proc.communicate()[0]
230 if jm_proc.returncode != 0:
231 raise FailedTestError("jm did not complete properly: %s" % output.replace("\n", " "))
234 compare_yuv_output(*runres)("x264-output.yuv", "jm-output.yuv")
235 except ComparisonError, e:
236 raise FailedTestError(e)
238 try: os.remove("x264-output.yuv")
241 try: os.remove("%s.264" % self.fixture.dispatcher.video)
244 try: os.remove("jm-output.yuv")
247 try: os.remove("log.dec")
250 try: os.remove("dataDec.txt")
253 @comparer(compare_pass)
254 def test_ffmpeg(self):
255 if not program_exists("ffmpeg"): raise DisabledTestError("ffmpeg unavailable")
257 runres = self._run_x264()
259 ffmpeg_proc = Popen([
262 "%s.264" % self.fixture.dispatcher.video,
264 ], stdout=PIPE, stderr=STDOUT)
266 output = ffmpeg_proc.communicate()[0]
267 if ffmpeg_proc.returncode != 0:
268 raise FailedTestError("ffmpeg did not complete properly: %s" % output.replace("\n", " "))
271 compare_yuv_output(*runres)("x264-output.yuv", "ffmpeg-output.yuv")
272 except ComparisonError, e:
273 raise FailedTestError(e)
275 try: os.remove("x264-output.yuv")
278 try: os.remove("%s.264" % self.fixture.dispatcher.video)
281 try: os.remove("ffmpeg-output.yuv")
284 return YUVOutputComparison
286 class Regression(Case):
287 depends = [ Compile ]
289 _psnr_pattern = re.compile(r"x264 [[]info[]]: PSNR Mean Y:\d+[.]\d+ U:\d+[.]\d+ V:\d+[.]\d+ Avg:\d+[.]\d+ Global:(\d+[.]\d+) kb/s:\d+[.]\d+")
290 _ssim_pattern = re.compile(r"x264 [[]info[]]: SSIM Mean Y:(\d+[.]\d+) [(]\d+[.]\d+db[)]")
293 if self.fixture.dispatcher.x264:
294 self.__class__.__name__ += " %s" % " ".join(self.fixture.dispatcher.x264)
301 "%s.264" % self.fixture.dispatcher.video,
303 ] + self.fixture.dispatcher.x264 + [
304 self.fixture.dispatcher.video
305 ], stdout=PIPE, stderr=STDOUT)
307 output = x264_proc.communicate()[0]
309 if x264_proc.returncode != 0:
310 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
312 for line in output.split("\n"):
313 if line.startswith("x264 [info]: PSNR Mean"):
314 return float(self._psnr_pattern.match(line).group(1))
316 raise FailedTestError("no PSNR output caught from x264")
318 try: os.remove("%s.264" % self.fixture.dispatcher.video)
326 "%s.264" % self.fixture.dispatcher.video,
328 ] + self.fixture.dispatcher.x264 + [
329 self.fixture.dispatcher.video
330 ], stdout=PIPE, stderr=STDOUT)
332 output = x264_proc.communicate()[0]
334 if x264_proc.returncode != 0:
335 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
337 for line in output.split("\n"):
338 if line.startswith("x264 [info]: SSIM Mean"):
339 return float(self._ssim_pattern.match(line).group(1))
341 raise FailedTestError("no PSNR output caught from x264")
343 try: os.remove("%s.264" % self.fixture.dispatcher.video)
346 def _generate_random_commandline():
349 for suboptions in OPTIONS:
350 commandline.append(suboptions[randrange(0, len(suboptions))])
352 return filter(None, reduce(operator.add, [ shlex.split(opt) for opt in commandline ]))
357 fixture.register_case(Compile)
359 fixture.register_case(Regression)
361 class Dispatcher(_Dispatcher):
362 video = "akiyo_qcif.y4m"
368 def _populate_parser(self):
369 super(Dispatcher, self)._populate_parser()
371 # don't do a whole lot with this
372 tcase = _YUVOutputComparisonFactory()
374 yuv_tests = [ name[5:] for name, meth in filter(lambda pair: pair[0][:5] == "test_", inspect.getmembers(tcase)) ]
376 group = OptionGroup(self.optparse, "x264 testing-specific options")
385 callback=lambda option, opt, value, parser: setattr(self, "video", value),
386 help="yuv video to perform testing on (default: %s)" % self.video
396 callback=lambda option, opt, value, parser: setattr(self, "seed", value),
397 help="seed for the random number generator (default: unix timestamp)"
407 callback=lambda option, opt, value, parser: setattr(self, "products", value),
408 help="number of cartesian products to generate for yuv comparison testing (default: %d)" % self.products
417 callback=lambda option, opt, value, parser: setattr(self, "configure", shlex.split(value)),
418 help="options to run ./configure with"
426 callback=lambda option, opt, value, parser: setattr(self, "yuv_tests", [
427 val.strip() for val in value.split(",")
429 help="select tests to run with yuv comparisons (default: %s, available: %s)" % (
430 ", ".join(self.yuv_tests),
441 callback=lambda option, opt, value, parser: setattr(self, "x264", shlex.split(value)),
442 help="additional options to run ./x264 with"
445 self.optparse.add_option_group(group)
447 def pre_dispatch(self):
448 if not hasattr(self, "seed"):
449 self.seed = int(time())
451 print "Using seed: %d" % self.seed
454 for i in xrange(self.products):
455 YUVOutputComparison = _YUVOutputComparisonFactory()
457 commandline = _generate_random_commandline()
461 while commandline in _generated:
463 commandline = _generate_random_commandline()
466 print >>sys.stderr, "Maximum command-line regeneration exceeded. " \
467 "Try a different seed or specify fewer products to generate."
470 commandline += self.x264
472 _generated.append(commandline)
474 YUVOutputComparison.options = commandline
475 YUVOutputComparison.__name__ = ("%s %s" % (YUVOutputComparison.__name__, " ".join(commandline)))
477 fixture.register_case(YUVOutputComparison)
479 Dispatcher(fixture).dispatch()