]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/tbb_avcodec.cpp
2.0.0.2: -ffmpeg_producer: Implemented slice based multithreading.
[casparcg] / modules / ffmpeg / tbb_avcodec.cpp
1 // Author Robert Nagy\r
2 \r
3 #include "stdafx.h"\r
4 \r
5 #include "tbb_avcodec.h"\r
6 \r
7 #include <common/log/log.h>\r
8 \r
9 #include <tbb/task.h>\r
10 #include <tbb/atomic.h>\r
11 \r
12 extern "C" \r
13 {\r
14         #define __STDC_CONSTANT_MACROS\r
15         #define __STDC_LIMIT_MACROS\r
16         #include <libavformat/avformat.h>\r
17 }\r
18 \r
19 namespace caspar {\r
20 \r
21 struct thread_context{};\r
22 \r
23 int task_execute(AVCodecContext* s, const std::function<int(void* arg, int arg_size, int jobnr, int threadnr)>& func, void* arg, int* ret, int count, int size)\r
24 {       \r
25         // jobnr global for all threads? Doesn't order matter?\r
26     tbb::atomic<int> counter;\r
27         counter = 0;\r
28                 \r
29         // Execute s->thread_count number of tasks in parallel.\r
30         tbb::parallel_for(0, s->thread_count, 1, [&](int threadnr) \r
31         {\r
32                 while(true)\r
33                 {\r
34                         int jobnr = counter++;\r
35                         if(jobnr >= count)\r
36                                 break;\r
37 \r
38                         int r = func(arg, size, jobnr, threadnr);\r
39                         if (ret)\r
40                                 ret[jobnr] = r;\r
41                 }\r
42         });\r
43         \r
44     return 0;\r
45 }\r
46         \r
47 int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)\r
48 {\r
49         return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int\r
50         {\r
51                 return func(s, reinterpret_cast<uint8_t*>(arg) + jobnr*size);\r
52         }, arg, ret, count, size);\r
53 }\r
54 \r
55 int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count)\r
56 {\r
57         return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int\r
58         {\r
59                 return func(s, arg, jobnr, threadnr);\r
60         }, arg, ret, count, 0);\r
61 }\r
62 \r
63 int thread_init(AVCodecContext *s)\r
64 {\r
65         // Only makes sense for slicing since decode is already called through task scheduler.\r
66     if(!(s->thread_type & FF_THREAD_SLICE))            \r
67        return 0;    \r
68 \r
69         static const size_t MAX_THREADS = 16; // See mpegvideo.h\r
70 \r
71     s->active_thread_type = FF_THREAD_SLICE;\r
72         s->thread_opaque          = malloc(sizeof(thread_context));      \r
73     s->execute                    = thread_execute;\r
74     s->execute2                   = thread_execute2;\r
75     s->thread_count               = MAX_THREADS; // We are using a taskscheduler, so use as many "threads/tasks" as possible. \r
76 \r
77         CASPAR_LOG(info) << "Initialized ffmpeg tbb context.";\r
78 \r
79     return 0;\r
80 }\r
81 \r
82 void thread_free(AVCodecContext* s)\r
83 {\r
84         if(!s->thread_opaque)\r
85                 return;\r
86         \r
87         free(s->thread_opaque); \r
88         s->thread_opaque = nullptr;\r
89 \r
90         CASPAR_LOG(info) << "Released ffmpeg tbb context.";\r
91 }\r
92 \r
93 int tbb_avcodec_open(AVCodecContext* avctx, AVCodec* codec)\r
94 {\r
95         thread_init(avctx);\r
96         // ff_thread_init will not be executed since thread_opaque != nullptr.\r
97         return avcodec_open(avctx, codec); \r
98 }\r
99 \r
100 int tbb_avcodec_close(AVCodecContext* avctx)\r
101 {\r
102         thread_free(avctx);\r
103         // ff_thread_free will not be executed since thread_opaque == nullptr.\r
104         return avcodec_close(avctx); \r
105 }\r
106 \r
107 }