]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer.cpp
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / core / consumer / frame_consumer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Robert Nagy, ronag89@gmail.com\r
20 */\r
21 \r
22 #include "../StdAfx.h"\r
23 \r
24 #include "frame_consumer.h"\r
25 \r
26 #include <common/except.h>\r
27 \r
28 #include <core/video_format.h>\r
29 #include <core/frame/frame.h>\r
30 \r
31 #include <boost/thread.hpp>\r
32 \r
33 namespace caspar { namespace core {\r
34                 \r
35 std::vector<const consumer_factory_t> g_factories;\r
36 \r
37 void register_consumer_factory(const consumer_factory_t& factory)\r
38 {\r
39         g_factories.push_back(factory);\r
40 }\r
41 \r
42 class destroy_consumer_proxy : public frame_consumer\r
43 {       \r
44         std::shared_ptr<frame_consumer> consumer_;\r
45 public:\r
46         destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
47                 : consumer_(std::move(consumer))\r
48         {\r
49         }\r
50 \r
51         ~destroy_consumer_proxy()\r
52         {               \r
53                 static tbb::atomic<int> counter = tbb::atomic<int>();\r
54                         \r
55                 ++counter;\r
56                 CASPAR_VERIFY(counter < 8);\r
57                 \r
58                 auto consumer = new std::shared_ptr<frame_consumer>(std::move(consumer_));\r
59                 boost::thread([=]\r
60                 {\r
61                         std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);\r
62 \r
63                         auto str = (*consumer)->print();\r
64                         try\r
65                         {\r
66                                 if(!consumer->unique())\r
67                                         CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();\r
68                                 else\r
69                                         CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";\r
70                         }\r
71                         catch(...){}\r
72 \r
73                         pointer_guard.reset();\r
74 \r
75                         --counter;\r
76                 }).detach(); \r
77         }\r
78         \r
79         virtual bool send(const_frame frame) override                                                                                                   {return consumer_->send(std::move(frame));}\r
80         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}\r
81         virtual std::wstring print() const override                                                                                                                     {return consumer_->print();}    \r
82         virtual std::wstring name() const override                                                                                                                      {return consumer_->name();}\r
83         virtual boost::property_tree::wptree info() const override                                                                                      {return consumer_->info();}\r
84         virtual bool has_synchronization_clock() const override                                                                                         {return consumer_->has_synchronization_clock();}\r
85         virtual int buffer_depth() const override                                                                                                                       {return consumer_->buffer_depth();}\r
86         virtual int index() const override                                                                                                                                      {return consumer_->index();}\r
87 };\r
88 \r
89 class print_consumer_proxy : public frame_consumer\r
90 {       \r
91         std::shared_ptr<frame_consumer> consumer_;\r
92 public:\r
93         print_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
94                 : consumer_(std::move(consumer))\r
95         {\r
96                 CASPAR_LOG(info) << consumer_->print() << L" Initialized.";\r
97         }\r
98 \r
99         ~print_consumer_proxy()\r
100         {               \r
101                 auto str = consumer_->print();\r
102                 CASPAR_LOG(trace) << str << L" Uninitializing.";\r
103                 consumer_.reset();\r
104                 CASPAR_LOG(info) << str << L" Uninitialized.";\r
105         }\r
106         \r
107         virtual bool send(const_frame frame) override                                                                                                   {return consumer_->send(std::move(frame));}\r
108         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}\r
109         virtual std::wstring print() const override                                                                                                                     {return consumer_->print();}\r
110         virtual std::wstring name() const override                                                                                                                      {return consumer_->name();}\r
111         virtual boost::property_tree::wptree info() const override                                                                                      {return consumer_->info();}\r
112         virtual bool has_synchronization_clock() const override                                                                                         {return consumer_->has_synchronization_clock();}\r
113         virtual int buffer_depth() const override                                                                                                                       {return consumer_->buffer_depth();}\r
114         virtual int index() const override                                                                                                                                      {return consumer_->index();}\r
115 };\r
116 \r
117 class recover_consumer_proxy : public frame_consumer\r
118 {       \r
119         std::shared_ptr<frame_consumer> consumer_;\r
120         int                                                             channel_index_;\r
121         video_format_desc                               format_desc_;\r
122 public:\r
123         recover_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
124                 : consumer_(std::move(consumer))\r
125         {\r
126         }\r
127         \r
128         virtual bool send(const_frame frame)                                    \r
129         {\r
130                 try\r
131                 {\r
132                         return consumer_->send(frame);\r
133                 }\r
134                 catch(...)\r
135                 {\r
136                         CASPAR_LOG_CURRENT_EXCEPTION();\r
137                         try\r
138                         {\r
139                                 consumer_->initialize(format_desc_, channel_index_);\r
140                                 return consumer_->send(frame);\r
141                         }\r
142                         catch(...)\r
143                         {\r
144                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
145                                 CASPAR_LOG(error) << print() << "Failed to recover consumer.";\r
146                                 return false;\r
147                         }\r
148                 }\r
149         }\r
150 \r
151         virtual void initialize(const struct video_format_desc& format_desc, int channel_index)         \r
152         {\r
153                 format_desc_    = format_desc;\r
154                 channel_index_  = channel_index;\r
155                 return consumer_->initialize(format_desc, channel_index);\r
156         }\r
157 \r
158         virtual std::wstring print() const override                                     {return consumer_->print();}\r
159         virtual std::wstring name() const override                                      {return consumer_->name();}\r
160         virtual boost::property_tree::wptree info() const override      {return consumer_->info();}\r
161         virtual bool has_synchronization_clock() const override         {return consumer_->has_synchronization_clock();}\r
162         virtual int buffer_depth() const override                                       {return consumer_->buffer_depth();}\r
163         virtual int index() const override                                                      {return consumer_->index();}\r
164 };\r
165 \r
166 // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.\r
167 class cadence_guard : public frame_consumer\r
168 {\r
169         spl::shared_ptr<frame_consumer>         consumer_;\r
170         std::vector<int>                                        audio_cadence_;\r
171         boost::circular_buffer<std::size_t>     sync_buffer_;\r
172 public:\r
173         cadence_guard(const spl::shared_ptr<frame_consumer>& consumer)\r
174                 : consumer_(consumer)\r
175         {\r
176         }\r
177         \r
178         virtual void initialize(const video_format_desc& format_desc, int channel_index) override\r
179         {\r
180                 audio_cadence_  = format_desc.audio_cadence;\r
181                 sync_buffer_    = boost::circular_buffer<std::size_t>(format_desc.audio_cadence.size());\r
182                 consumer_->initialize(format_desc, channel_index);\r
183         }\r
184 \r
185         virtual bool send(const_frame frame) override\r
186         {               \r
187                 if(audio_cadence_.size() == 1)\r
188                         return consumer_->send(frame);\r
189 \r
190                 bool result = true;\r
191                 \r
192                 if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame.audio_data().size())) \r
193                 {       \r
194                         // Audio sent so far is in sync, now we can send the next chunk.\r
195                         result = consumer_->send(frame);\r
196                         boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
197                 }\r
198                 else\r
199                         CASPAR_LOG(trace) << print() << L" Syncing audio.";\r
200 \r
201                 sync_buffer_.push_back(static_cast<int>(frame.audio_data().size()));\r
202                 \r
203                 return result;\r
204         }\r
205         \r
206         virtual std::wstring print() const override                                     {return consumer_->print();}\r
207         virtual std::wstring name() const override                                      {return consumer_->name();}\r
208         virtual boost::property_tree::wptree info() const override      {return consumer_->info();}\r
209         virtual bool has_synchronization_clock() const override         {return consumer_->has_synchronization_clock();}\r
210         virtual int buffer_depth() const override                                       {return consumer_->buffer_depth();}\r
211         virtual int index() const override                                                      {return consumer_->index();}\r
212 };\r
213 \r
214 spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
215 {\r
216         if(params.empty())\r
217                 BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info(""));\r
218         \r
219         auto consumer = frame_consumer::empty();\r
220         std::any_of(g_factories.begin(), g_factories.end(), [&](const consumer_factory_t& factory) -> bool\r
221                 {\r
222                         try\r
223                         {\r
224                                 consumer = factory(params);\r
225                         }\r
226                         catch(...)\r
227                         {\r
228                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
229                         }\r
230                         return consumer != frame_consumer::empty();\r
231                 });\r
232 \r
233         if(consumer == frame_consumer::empty())\r
234                 BOOST_THROW_EXCEPTION(file_not_found() << msg_info("No match found for supplied commands. Check syntax."));\r
235 \r
236         return spl::make_shared<destroy_consumer_proxy>(\r
237                         spl::make_shared<print_consumer_proxy>(\r
238                          spl::make_shared<recover_consumer_proxy>(\r
239                           spl::make_shared<cadence_guard>(\r
240                            std::move(consumer)))));\r
241 }\r
242 \r
243 const spl::shared_ptr<frame_consumer>& frame_consumer::empty()\r
244 {\r
245         class empty_frame_consumer : public frame_consumer\r
246         {\r
247         public:\r
248                 virtual bool send(const_frame) override {return false;}\r
249                 virtual void initialize(const video_format_desc&, int) override{}\r
250                 virtual std::wstring print() const override {return L"empty";}\r
251                 virtual std::wstring name() const override {return L"empty";}\r
252                 virtual bool has_synchronization_clock() const override {return false;}\r
253                 virtual int buffer_depth() const override {return 0;};\r
254                 virtual int index() const{return -1;}\r
255                 virtual boost::property_tree::wptree info() const override\r
256                 {\r
257                         boost::property_tree::wptree info;\r
258                         info.add(L"type", L"empty");\r
259                         return info;\r
260                 }\r
261         };\r
262         static spl::shared_ptr<frame_consumer> consumer = spl::make_shared<empty_frame_consumer>();\r
263         return consumer;\r
264 }\r
265 \r
266 }}