]> git.sesse.net Git - x264/blob - tools/test_x264.py
Add Python regression test script
[x264] / tools / test_x264.py
1 #!/usr/bin/env python
2
3 import operator
4
5 from optparse import OptionGroup
6
7 import sys
8
9 from time import time
10
11 from digress.cli import Dispatcher as _Dispatcher
12 from digress.errors import ComparisonError, FailedTestError, DisabledTestError
13 from digress.testing import depends, comparer, Fixture, Case
14 from digress.comparers import compare_pass
15 from digress.scm import git as x264git
16
17 from subprocess import Popen, PIPE, STDOUT
18
19 import os
20 import re
21 import shlex
22 import inspect
23
24 from random import randrange, seed
25 from math import ceil
26
27 from itertools import imap, izip
28
29 os.chdir(os.path.join(os.path.dirname(__file__), ".."))
30
31 # options
32
33 OPTIONS = [
34     [ "--tune %s" % t for t in ("film", "zerolatency") ],
35     ("", "--intra-refresh"),
36     ("", "--no-cabac"),
37     [ "--preset %s" % p for p in ("ultrafast",
38                                   "superfast",
39                                   "veryfast",
40                                   "faster",
41                                   "fast",
42                                   "medium",
43                                   "slow",
44                                   "slower") ]
45 ]
46
47 # end options
48
49 def compare_yuv_output(width, height):
50     def _compare_yuv_output(file_a, file_b):
51         size_a = os.path.getsize(file_a)
52         size_b = os.path.getsize(file_b)
53
54         if size_a != size_b:
55             raise ComparisonError("%s is not the same size as %s" % (
56                 file_a,
57                 file_b
58             ))
59
60         BUFFER_SIZE = 8196
61
62         offset = 0
63
64         with open(file_a) as f_a:
65             with open(file_b) as f_b:
66                 for chunk_a, chunk_b in izip(
67                     imap(
68                         lambda i: f_a.read(BUFFER_SIZE),
69                         xrange(size_a // BUFFER_SIZE + 1)
70                     ),
71                     imap(
72                         lambda i: f_b.read(BUFFER_SIZE),
73                         xrange(size_b // BUFFER_SIZE + 1)
74                     )
75                 ):
76                     chunk_size = len(chunk_a)
77
78                     if chunk_a != chunk_b:
79                         for i in xrange(chunk_size):
80                             if chunk_a[i] != chunk_b[i]:
81                                 # calculate the macroblock, plane and frame from the offset
82                                 offs = offset + i
83
84                                 y_plane_area = width * height
85                                 u_plane_area = y_plane_area + y_plane_area * 0.25
86                                 v_plane_area = u_plane_area + y_plane_area * 0.25
87
88                                 pixel = offs % v_plane_area
89                                 frame = offs // v_plane_area
90
91                                 if pixel < y_plane_area:
92                                     plane = "Y"
93
94                                     pixel_x = pixel % width
95                                     pixel_y = pixel // width
96
97                                     macroblock = (ceil(pixel_x / 16.0), ceil(pixel_y / 16.0))
98                                 elif pixel < u_plane_area:
99                                     plane = "U"
100
101                                     pixel -= y_plane_area
102
103                                     pixel_x = pixel % width
104                                     pixel_y = pixel // width
105
106                                     macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
107                                 else:
108                                     plane = "V"
109
110                                     pixel -= u_plane_area
111
112                                     pixel_x = pixel % width
113                                     pixel_y = pixel // width
114
115                                     macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0))
116
117                                 macroblock = tuple([ int(x) for x in macroblock ])
118
119                                 raise ComparisonError("%s differs from %s at frame %d, " \
120                                                       "macroblock %s on the %s plane (offset %d)" % (
121                                     file_a,
122                                     file_b,
123                                     frame,
124                                     macroblock,
125                                     plane,
126                                     offs)
127                                 )
128
129                     offset += chunk_size
130
131     return _compare_yuv_output
132
133 def program_exists(program):
134     def is_exe(fpath):
135         return os.path.exists(fpath) and os.access(fpath, os.X_OK)
136
137     fpath, fname = os.path.split(program)
138
139     if fpath:
140         if is_exe(program):
141             return program
142     else:
143         for path in os.environ["PATH"].split(os.pathsep):
144             exe_file = os.path.join(path, program)
145             if is_exe(exe_file):
146                 return exe_file
147
148     return None
149
150 class x264(Fixture):
151     scm = x264git
152
153 class Compile(Case):
154     @comparer(compare_pass)
155     def test_configure(self):
156         Popen([
157             "make",
158             "distclean"
159         ], stdout=PIPE, stderr=STDOUT).communicate()
160
161         configure_proc = Popen([
162             "./configure"
163         ] + self.fixture.dispatcher.configure, stdout=PIPE, stderr=STDOUT)
164
165         output = configure_proc.communicate()[0]
166         if configure_proc.returncode != 0:
167             raise FailedTestError("configure failed: %s" % output.replace("\n", " "))
168
169     @depends("configure")
170     @comparer(compare_pass)
171     def test_make(self):
172         make_proc = Popen([
173             "make",
174             "-j5"
175         ], stdout=PIPE, stderr=STDOUT)
176
177         output = make_proc.communicate()[0]
178         if make_proc.returncode != 0:
179             raise FailedTestError("make failed: %s" % output.replace("\n", " "))
180
181 _dimension_pattern = re.compile(r"\w+ [[]info[]]: (\d+)x(\d+)[pi] \d+:\d+ @ \d+/\d+ fps [(][vc]fr[)]")
182
183 def _YUVOutputComparisonFactory():
184     class YUVOutputComparison(Case):
185         _dimension_pattern = _dimension_pattern
186
187         depends = [ Compile ]
188         options = []
189
190         def __init__(self):
191             for name, meth in inspect.getmembers(self):
192                 if name[:5] == "test_" and name[5:] not in self.fixture.dispatcher.yuv_tests:
193                     delattr(self.__class__, name)
194
195         def _run_x264(self):
196             x264_proc = Popen([
197                 "./x264",
198                 "-o",
199                 "%s.264" % self.fixture.dispatcher.video,
200                 "--dump-yuv",
201                 "x264-output.yuv"
202             ] + self.options + [
203                 self.fixture.dispatcher.video
204             ], stdout=PIPE, stderr=STDOUT)
205
206             output = x264_proc.communicate()[0]
207             if x264_proc.returncode != 0:
208                 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
209
210             matches = _dimension_pattern.match(output)
211
212             return (int(matches.group(1)), int(matches.group(2)))
213
214         @comparer(compare_pass)
215         def test_jm(self):
216             if not program_exists("ldecod"): raise DisabledTestError("jm unavailable")
217
218             try:
219                 runres = self._run_x264()
220
221                 jm_proc = Popen([
222                     "ldecod",
223                     "-i",
224                     "%s.264" % self.fixture.dispatcher.video,
225                     "-o",
226                     "jm-output.yuv"
227                 ], stdout=PIPE, stderr=STDOUT)
228
229                 output = jm_proc.communicate()[0]
230                 if jm_proc.returncode != 0:
231                     raise FailedTestError("jm did not complete properly: %s" % output.replace("\n", " "))
232
233                 try:
234                     compare_yuv_output(*runres)("x264-output.yuv", "jm-output.yuv")
235                 except ComparisonError, e:
236                     raise FailedTestError(e)
237             finally:
238                 try: os.remove("x264-output.yuv")
239                 except: pass
240
241                 try: os.remove("%s.264" % self.fixture.dispatcher.video)
242                 except: pass
243
244                 try: os.remove("jm-output.yuv")
245                 except: pass
246
247                 try: os.remove("log.dec")
248                 except: pass
249
250                 try: os.remove("dataDec.txt")
251                 except: pass
252
253         @comparer(compare_pass)
254         def test_ffmpeg(self):
255             if not program_exists("ffmpeg"): raise DisabledTestError("ffmpeg unavailable")
256             try:
257                 runres = self._run_x264()
258
259                 ffmpeg_proc = Popen([
260                     "ffmpeg",
261                     "-i",
262                     "%s.264" % self.fixture.dispatcher.video,
263                     "ffmpeg-output.yuv"
264                 ], stdout=PIPE, stderr=STDOUT)
265
266                 output = ffmpeg_proc.communicate()[0]
267                 if ffmpeg_proc.returncode != 0:
268                     raise FailedTestError("ffmpeg did not complete properly: %s" % output.replace("\n", " "))
269
270                 try:
271                     compare_yuv_output(*runres)("x264-output.yuv", "ffmpeg-output.yuv")
272                 except ComparisonError, e:
273                     raise FailedTestError(e)
274             finally:
275                 try: os.remove("x264-output.yuv")
276                 except: pass
277
278                 try: os.remove("%s.264" % self.fixture.dispatcher.video)
279                 except: pass
280
281                 try: os.remove("ffmpeg-output.yuv")
282                 except: pass
283
284     return YUVOutputComparison
285
286 class Regression(Case):
287     depends = [ Compile ]
288
289     _psnr_pattern = re.compile(r"x264 [[]info[]]: PSNR Mean Y:\d+[.]\d+ U:\d+[.]\d+ V:\d+[.]\d+ Avg:\d+[.]\d+ Global:(\d+[.]\d+) kb/s:\d+[.]\d+")
290     _ssim_pattern = re.compile(r"x264 [[]info[]]: SSIM Mean Y:(\d+[.]\d+) [(]\d+[.]\d+db[)]")
291
292     def __init__(self):
293         if self.fixture.dispatcher.x264:
294             self.__class__.__name__ += " %s" % " ".join(self.fixture.dispatcher.x264)
295
296     def test_psnr(self):
297         try:
298             x264_proc = Popen([
299                 "./x264",
300                 "-o",
301                 "%s.264" % self.fixture.dispatcher.video,
302                 "--psnr"
303             ] + self.fixture.dispatcher.x264 + [
304                 self.fixture.dispatcher.video
305             ], stdout=PIPE, stderr=STDOUT)
306
307             output = x264_proc.communicate()[0]
308
309             if x264_proc.returncode != 0:
310                 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
311
312             for line in output.split("\n"):
313                 if line.startswith("x264 [info]: PSNR Mean"):
314                     return float(self._psnr_pattern.match(line).group(1))
315
316             raise FailedTestError("no PSNR output caught from x264")
317         finally:
318             try: os.remove("%s.264" % self.fixture.dispatcher.video)
319             except: pass
320
321     def test_ssim(self):
322         try:
323             x264_proc = Popen([
324                 "./x264",
325                 "-o",
326                 "%s.264" % self.fixture.dispatcher.video,
327                 "--ssim"
328             ] + self.fixture.dispatcher.x264 + [
329                 self.fixture.dispatcher.video
330             ], stdout=PIPE, stderr=STDOUT)
331
332             output = x264_proc.communicate()[0]
333
334             if x264_proc.returncode != 0:
335                 raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " "))
336
337             for line in output.split("\n"):
338                 if line.startswith("x264 [info]: SSIM Mean"):
339                     return float(self._ssim_pattern.match(line).group(1))
340
341             raise FailedTestError("no PSNR output caught from x264")
342         finally:
343             try: os.remove("%s.264" % self.fixture.dispatcher.video)
344             except: pass
345
346 def _generate_random_commandline():
347     commandline = []
348
349     for suboptions in OPTIONS:
350         commandline.append(suboptions[randrange(0, len(suboptions))])
351
352     return filter(None, reduce(operator.add, [ shlex.split(opt) for opt in commandline ]))
353
354 _generated = []
355
356 fixture = x264()
357 fixture.register_case(Compile)
358
359 fixture.register_case(Regression)
360
361 class Dispatcher(_Dispatcher):
362     video = "akiyo_qcif.y4m"
363     products = 50
364     configure = []
365     x264 = []
366     yuv_tests = [ "jm" ]
367
368     def _populate_parser(self):
369         super(Dispatcher, self)._populate_parser()
370
371         # don't do a whole lot with this
372         tcase = _YUVOutputComparisonFactory()
373
374         yuv_tests = [ name[5:] for name, meth in filter(lambda pair: pair[0][:5] == "test_", inspect.getmembers(tcase)) ]
375
376         group = OptionGroup(self.optparse, "x264 testing-specific options")
377
378         group.add_option(
379             "-v",
380             "--video",
381             metavar="FILENAME",
382             action="callback",
383             dest="video",
384             type=str,
385             callback=lambda option, opt, value, parser: setattr(self, "video", value),
386             help="yuv video to perform testing on (default: %s)" % self.video
387         )
388
389         group.add_option(
390             "-s",
391             "--seed",
392             metavar="SEED",
393             action="callback",
394             dest="seed",
395             type=int,
396             callback=lambda option, opt, value, parser: setattr(self, "seed", value),
397             help="seed for the random number generator (default: unix timestamp)"
398         )
399
400         group.add_option(
401             "-p",
402             "--product-tests",
403             metavar="NUM",
404             action="callback",
405             dest="video",
406             type=int,
407             callback=lambda option, opt, value, parser: setattr(self, "products", value),
408             help="number of cartesian products to generate for yuv comparison testing (default: %d)" % self.products
409         )
410
411         group.add_option(
412             "--configure-with",
413             metavar="FLAGS",
414             action="callback",
415             dest="configure",
416             type=str,
417             callback=lambda option, opt, value, parser: setattr(self, "configure", shlex.split(value)),
418             help="options to run ./configure with"
419         )
420
421         group.add_option(
422             "--yuv-tests",
423             action="callback",
424             dest="yuv_tests",
425             type=str,
426             callback=lambda option, opt, value, parser: setattr(self, "yuv_tests", [
427                 val.strip() for val in value.split(",")
428             ]),
429             help="select tests to run with yuv comparisons (default: %s, available: %s)" % (
430                 ", ".join(self.yuv_tests),
431                 ", ".join(yuv_tests)
432             )
433         )
434
435         group.add_option(
436             "--x264-with",
437             metavar="FLAGS",
438             action="callback",
439             dest="x264",
440             type=str,
441             callback=lambda option, opt, value, parser: setattr(self, "x264", shlex.split(value)),
442             help="additional options to run ./x264 with"
443         )
444
445         self.optparse.add_option_group(group)
446
447     def pre_dispatch(self):
448         if not hasattr(self, "seed"):
449             self.seed = int(time())
450
451         print "Using seed: %d" % self.seed
452         seed(self.seed)
453
454         for i in xrange(self.products):
455             YUVOutputComparison = _YUVOutputComparisonFactory()
456
457             commandline = _generate_random_commandline()
458
459             counter = 0
460
461             while commandline in _generated:
462                 counter += 1
463                 commandline = _generate_random_commandline()
464
465                 if counter > 100:
466                     print >>sys.stderr, "Maximum command-line regeneration exceeded. "  \
467                                         "Try a different seed or specify fewer products to generate."
468                     sys.exit(1)
469
470             commandline += self.x264
471
472             _generated.append(commandline)
473
474             YUVOutputComparison.options = commandline
475             YUVOutputComparison.__name__ = ("%s %s" % (YUVOutputComparison.__name__, " ".join(commandline)))
476
477             fixture.register_case(YUVOutputComparison)
478
479 Dispatcher(fixture).dispatch()