]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer.cpp
ec75e99c92ef83c3ea96958a168d2b2997a35802
[casparcg] / core / consumer / frame_consumer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Robert Nagy, ronag89@gmail.com\r
20 */\r
21 \r
22 #include "../StdAfx.h"\r
23 \r
24 #include "frame_consumer.h"\r
25 \r
26 #include <common/except.h>\r
27 \r
28 #include <core/video_format.h>\r
29 #include <core/frame/frame.h>\r
30 \r
31 #include <boost/thread.hpp>\r
32 \r
33 namespace caspar { namespace core {\r
34                 \r
35 std::vector<const consumer_factory_t> g_factories;\r
36 \r
37 void register_consumer_factory(const consumer_factory_t& factory)\r
38 {\r
39         g_factories.push_back(factory);\r
40 }\r
41 \r
42 class destroy_consumer_proxy : public frame_consumer\r
43 {       \r
44         std::shared_ptr<frame_consumer> consumer_;\r
45 public:\r
46         destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
47                 : consumer_(std::move(consumer))\r
48         {\r
49         }\r
50 \r
51         ~destroy_consumer_proxy()\r
52         {               \r
53                 static tbb::atomic<int> counter = tbb::atomic<int>();\r
54                         \r
55                 ++counter;\r
56                 CASPAR_VERIFY(counter < 8);\r
57                 \r
58                 auto consumer = new std::shared_ptr<frame_consumer>(std::move(consumer_));\r
59                 boost::thread([=]\r
60                 {\r
61                         std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);\r
62 \r
63                         auto str = (*consumer)->print();\r
64                         try\r
65                         {\r
66                                 if(!consumer->unique())\r
67                                         CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();\r
68                                 else\r
69                                         CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";\r
70                         }\r
71                         catch(...){}\r
72 \r
73                         pointer_guard.reset();\r
74 \r
75                         --counter;\r
76                 }).detach(); \r
77         }\r
78         \r
79         bool send(const_frame frame) override                                                                                                                           {return consumer_->send(std::move(frame));}\r
80         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}\r
81         std::wstring print() const override                                                                                                                                     {return consumer_->print();}    \r
82         std::wstring name() const override                                                                                                                                      {return consumer_->name();}\r
83         boost::property_tree::wptree info() const override                                                                                                      {return consumer_->info();}\r
84         bool has_synchronization_clock() const override                                                                                                         {return consumer_->has_synchronization_clock();}\r
85         int buffer_depth() const override                                                                                                                                       {return consumer_->buffer_depth();}\r
86         int index() const override                                                                                                                                                      {return consumer_->index();}\r
87         void subscribe(const monitor::observable::observer_ptr& o) override                                                                     {consumer_->subscribe(o);}\r
88         void unsubscribe(const monitor::observable::observer_ptr& o) override                                                           {consumer_->unsubscribe(o);}                    \r
89 };\r
90 \r
91 class print_consumer_proxy : public frame_consumer\r
92 {       \r
93         std::shared_ptr<frame_consumer> consumer_;\r
94 public:\r
95         print_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
96                 : consumer_(std::move(consumer))\r
97         {\r
98                 CASPAR_LOG(info) << consumer_->print() << L" Initialized.";\r
99         }\r
100 \r
101         ~print_consumer_proxy()\r
102         {               \r
103                 auto str = consumer_->print();\r
104                 CASPAR_LOG(trace) << str << L" Uninitializing.";\r
105                 consumer_.reset();\r
106                 CASPAR_LOG(info) << str << L" Uninitialized.";\r
107         }\r
108         \r
109         bool send(const_frame frame) override                                                                                                   {return consumer_->send(std::move(frame));}\r
110         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}\r
111         std::wstring print() const override                                                                                                                     {return consumer_->print();}\r
112         std::wstring name() const override                                                                                                                      {return consumer_->name();}\r
113         boost::property_tree::wptree info() const override                                                                                      {return consumer_->info();}\r
114         bool has_synchronization_clock() const override                                                                                         {return consumer_->has_synchronization_clock();}\r
115         int buffer_depth() const override                                                                                                                       {return consumer_->buffer_depth();}\r
116         int index() const override                                                                                                                                      {return consumer_->index();}\r
117         void subscribe(const monitor::observable::observer_ptr& o) override                                                                     {consumer_->subscribe(o);}\r
118         void unsubscribe(const monitor::observable::observer_ptr& o) override                                                           {consumer_->unsubscribe(o);}    \r
119 };\r
120 \r
121 class recover_consumer_proxy : public frame_consumer\r
122 {       \r
123         std::shared_ptr<frame_consumer> consumer_;\r
124         int                                                             channel_index_;\r
125         video_format_desc                               format_desc_;\r
126 public:\r
127         recover_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
128                 : consumer_(std::move(consumer))\r
129         {\r
130         }\r
131         \r
132         virtual bool send(const_frame frame)                                    \r
133         {\r
134                 try\r
135                 {\r
136                         return consumer_->send(frame);\r
137                 }\r
138                 catch(...)\r
139                 {\r
140                         CASPAR_LOG_CURRENT_EXCEPTION();\r
141                         try\r
142                         {\r
143                                 consumer_->initialize(format_desc_, channel_index_);\r
144                                 return consumer_->send(frame);\r
145                         }\r
146                         catch(...)\r
147                         {\r
148                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
149                                 CASPAR_LOG(error) << print() << " Failed to recover consumer.";\r
150                                 return false;\r
151                         }\r
152                 }\r
153         }\r
154 \r
155         virtual void initialize(const struct video_format_desc& format_desc, int channel_index)         \r
156         {\r
157                 format_desc_    = format_desc;\r
158                 channel_index_  = channel_index;\r
159                 return consumer_->initialize(format_desc, channel_index);\r
160         }\r
161 \r
162         std::wstring print() const override                                                                             {return consumer_->print();}\r
163         std::wstring name() const override                                                                              {return consumer_->name();}\r
164         boost::property_tree::wptree info() const override                                              {return consumer_->info();}\r
165         bool has_synchronization_clock() const override                                                 {return consumer_->has_synchronization_clock();}\r
166         int buffer_depth() const override                                                                               {return consumer_->buffer_depth();}\r
167         int index() const override                                                                                              {return consumer_->index();}\r
168         void subscribe(const monitor::observable::observer_ptr& o) override             {consumer_->subscribe(o);}\r
169         void unsubscribe(const monitor::observable::observer_ptr& o) override   {consumer_->unsubscribe(o);}    \r
170 };\r
171 \r
172 // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.\r
173 class cadence_guard : public frame_consumer\r
174 {\r
175         spl::shared_ptr<frame_consumer>         consumer_;\r
176         std::vector<int>                                        audio_cadence_;\r
177         boost::circular_buffer<std::size_t>     sync_buffer_;\r
178 public:\r
179         cadence_guard(const spl::shared_ptr<frame_consumer>& consumer)\r
180                 : consumer_(consumer)\r
181         {\r
182         }\r
183         \r
184         void initialize(const video_format_desc& format_desc, int channel_index) override\r
185         {\r
186                 audio_cadence_  = format_desc.audio_cadence;\r
187                 sync_buffer_    = boost::circular_buffer<std::size_t>(format_desc.audio_cadence.size());\r
188                 consumer_->initialize(format_desc, channel_index);\r
189         }\r
190 \r
191         bool send(const_frame frame) override\r
192         {               \r
193                 if(audio_cadence_.size() == 1)\r
194                         return consumer_->send(frame);\r
195 \r
196                 bool result = true;\r
197                 \r
198                 if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame.audio_data().size())) \r
199                 {       \r
200                         // Audio sent so far is in sync, now we can send the next chunk.\r
201                         result = consumer_->send(frame);\r
202                         boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
203                 }\r
204                 else\r
205                         CASPAR_LOG(trace) << print() << L" Syncing audio.";\r
206 \r
207                 sync_buffer_.push_back(static_cast<int>(frame.audio_data().size()));\r
208                 \r
209                 return result;\r
210         }\r
211         \r
212         std::wstring print() const override                                                                             {return consumer_->print();}\r
213         std::wstring name() const override                                                                              {return consumer_->name();}\r
214         boost::property_tree::wptree info() const override                                              {return consumer_->info();}\r
215         bool has_synchronization_clock() const override                                                 {return consumer_->has_synchronization_clock();}\r
216         int buffer_depth() const override                                                                               {return consumer_->buffer_depth();}\r
217         int index() const override                                                                                              {return consumer_->index();}\r
218         void subscribe(const monitor::observable::observer_ptr& o) override             {consumer_->subscribe(o);}\r
219         void unsubscribe(const monitor::observable::observer_ptr& o) override   {consumer_->unsubscribe(o);}    \r
220 };\r
221 \r
222 spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
223 {\r
224         if(params.empty())\r
225                 CASPAR_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info(""));\r
226         \r
227         auto consumer = frame_consumer::empty();\r
228         std::any_of(g_factories.begin(), g_factories.end(), [&](const consumer_factory_t& factory) -> bool\r
229                 {\r
230                         try\r
231                         {\r
232                                 consumer = factory(params);\r
233                         }\r
234                         catch(...)\r
235                         {\r
236                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
237                         }\r
238                         return consumer != frame_consumer::empty();\r
239                 });\r
240 \r
241         if(consumer == frame_consumer::empty())\r
242                 CASPAR_THROW_EXCEPTION(file_not_found() << msg_info("No match found for supplied commands. Check syntax."));\r
243 \r
244         return spl::make_shared<destroy_consumer_proxy>(\r
245                         spl::make_shared<print_consumer_proxy>(\r
246                          spl::make_shared<recover_consumer_proxy>(\r
247                           spl::make_shared<cadence_guard>(\r
248                            std::move(consumer)))));\r
249 }\r
250 \r
251 const spl::shared_ptr<frame_consumer>& frame_consumer::empty()\r
252 {\r
253         class empty_frame_consumer : public frame_consumer\r
254         {\r
255         public:\r
256                 bool send(const_frame) override {return false;}\r
257                 void initialize(const video_format_desc&, int) override{}\r
258                 std::wstring print() const override {return L"empty";}\r
259                 std::wstring name() const override {return L"empty";}\r
260                 bool has_synchronization_clock() const override {return false;}\r
261                 int buffer_depth() const override {return 0;};\r
262                 virtual int index() const{return -1;}\r
263                 void subscribe(const monitor::observable::observer_ptr& o) override{}\r
264                 void unsubscribe(const monitor::observable::observer_ptr& o) override{}\r
265                 boost::property_tree::wptree info() const override\r
266                 {\r
267                         boost::property_tree::wptree info;\r
268                         info.add(L"type", L"empty");\r
269                         return info;\r
270                 }\r
271         };\r
272         static spl::shared_ptr<frame_consumer> consumer = spl::make_shared<empty_frame_consumer>();\r
273         return consumer;\r
274 }\r
275 \r
276 }}