]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer.cpp
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / core / consumer / frame_consumer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Robert Nagy, ronag89@gmail.com\r
20 */\r
21 \r
22 #include "../StdAfx.h"\r
23 \r
24 #include "frame_consumer.h"\r
25 \r
26 #include <common/except.h>\r
27 \r
28 #include <core/video_format.h>\r
29 #include <core/frame/data_frame.h>\r
30 \r
31 #include <common/concurrency/async.h>\r
32 \r
33 namespace caspar { namespace core {\r
34                 \r
35 std::vector<const consumer_factory_t> g_factories;\r
36 \r
37 void register_consumer_factory(const consumer_factory_t& factory)\r
38 {\r
39         g_factories.push_back(factory);\r
40 }\r
41 \r
42 class destroy_consumer_proxy : public frame_consumer\r
43 {       \r
44         std::unique_ptr<std::shared_ptr<frame_consumer>> consumer_;\r
45 public:\r
46         destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
47                 : consumer_(new std::shared_ptr<frame_consumer>(std::move(consumer)))\r
48         {\r
49         }\r
50 \r
51         ~destroy_consumer_proxy()\r
52         {               \r
53                 static tbb::atomic<int> counter = tbb::atomic<int>();\r
54                         \r
55                 ++counter;\r
56                 CASPAR_VERIFY(counter < 32);\r
57                 \r
58                 auto consumer = consumer_.release();\r
59                 async([=]\r
60                 {\r
61                         std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);\r
62 \r
63                         auto str = (*consumer)->print();\r
64                         try\r
65                         {\r
66                                 if(!consumer->unique())\r
67                                         CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();\r
68                                 else\r
69                                         CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";\r
70                         }\r
71                         catch(...){}\r
72 \r
73                         pointer_guard.reset();\r
74 \r
75                         --counter;\r
76                 }); \r
77         }\r
78         \r
79         virtual bool send(const spl::shared_ptr<const struct data_frame>& frame) override                                       {return (*consumer_)->send(frame);}\r
80         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return (*consumer_)->initialize(format_desc, channel_index);}\r
81         virtual std::wstring print() const override                                                                                                                     {return (*consumer_)->print();}\r
82         virtual boost::property_tree::wptree info() const override                                                                                      {return (*consumer_)->info();}\r
83         virtual bool has_synchronization_clock() const override                                                                                         {return (*consumer_)->has_synchronization_clock();}\r
84         virtual int buffer_depth() const override                                                                                                                       {return (*consumer_)->buffer_depth();}\r
85         virtual int index() const override                                                                                                                                      {return (*consumer_)->index();}\r
86 };\r
87 \r
88 class print_consumer_proxy : public frame_consumer\r
89 {       \r
90         std::shared_ptr<frame_consumer> consumer_;\r
91 public:\r
92         print_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
93                 : consumer_(std::move(consumer))\r
94         {\r
95                 CASPAR_LOG(info) << consumer_->print() << L" Initialized.";\r
96         }\r
97 \r
98         ~print_consumer_proxy()\r
99         {               \r
100                 auto str = consumer_->print();\r
101                 CASPAR_LOG(trace) << str << L" Uninitializing.";\r
102                 consumer_.reset();\r
103                 CASPAR_LOG(info) << str << L" Uninitialized.";\r
104         }\r
105         \r
106         virtual bool send(const spl::shared_ptr<const struct data_frame>& frame) override                                       {return consumer_->send(frame);}\r
107         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}\r
108         virtual std::wstring print() const override                                                                                                                     {return consumer_->print();}\r
109         virtual boost::property_tree::wptree info() const override                                                                                      {return consumer_->info();}\r
110         virtual bool has_synchronization_clock() const override                                                                                         {return consumer_->has_synchronization_clock();}\r
111         virtual int buffer_depth() const override                                                                                                                       {return consumer_->buffer_depth();}\r
112         virtual int index() const override                                                                                                                                      {return consumer_->index();}\r
113 };\r
114 \r
115 class recover_consumer_proxy : public frame_consumer\r
116 {       \r
117         std::shared_ptr<frame_consumer> consumer_;\r
118         int                                                             channel_index_;\r
119         video_format_desc                               format_desc_;\r
120 public:\r
121         recover_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
122                 : consumer_(std::move(consumer))\r
123         {\r
124         }\r
125         \r
126         virtual bool send(const spl::shared_ptr<const struct data_frame>& frame)                                        \r
127         {\r
128                 try\r
129                 {\r
130                         return consumer_->send(frame);\r
131                 }\r
132                 catch(...)\r
133                 {\r
134                         CASPAR_LOG_CURRENT_EXCEPTION();\r
135                         try\r
136                         {\r
137                                 consumer_->initialize(format_desc_, channel_index_);\r
138                                 return consumer_->send(frame);\r
139                         }\r
140                         catch(...)\r
141                         {\r
142                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
143                                 CASPAR_LOG(error) << print() << "Failed to recover consumer.";\r
144                                 return false;\r
145                         }\r
146                 }\r
147         }\r
148 \r
149         virtual void initialize(const struct video_format_desc& format_desc, int channel_index)         \r
150         {\r
151                 format_desc_    = format_desc;\r
152                 channel_index_  = channel_index;\r
153                 return consumer_->initialize(format_desc, channel_index);\r
154         }\r
155 \r
156         virtual std::wstring print() const override                                     {return consumer_->print();}\r
157         virtual boost::property_tree::wptree info() const override      {return consumer_->info();}\r
158         virtual bool has_synchronization_clock() const override         {return consumer_->has_synchronization_clock();}\r
159         virtual int buffer_depth() const override                                       {return consumer_->buffer_depth();}\r
160         virtual int index() const override                                                      {return consumer_->index();}\r
161 };\r
162 \r
163 // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.\r
164 class cadence_guard : public frame_consumer\r
165 {\r
166         spl::shared_ptr<frame_consumer>         consumer_;\r
167         std::vector<int>                                audio_cadence_;\r
168         boost::circular_buffer<int>     sync_buffer_;\r
169 public:\r
170         cadence_guard(const spl::shared_ptr<frame_consumer>& consumer)\r
171                 : consumer_(consumer)\r
172         {\r
173         }\r
174         \r
175         virtual void initialize(const video_format_desc& format_desc, int channel_index) override\r
176         {\r
177                 audio_cadence_  = format_desc.audio_cadence;\r
178                 sync_buffer_    = boost::circular_buffer<int>(format_desc.audio_cadence.size());\r
179                 consumer_->initialize(format_desc, channel_index);\r
180         }\r
181 \r
182         virtual bool send(const spl::shared_ptr<const data_frame>& frame) override\r
183         {               \r
184                 if(audio_cadence_.size() == 1)\r
185                         return consumer_->send(frame);\r
186 \r
187                 bool result = true;\r
188                 \r
189                 if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame->audio_data().size())) \r
190                 {       \r
191                         // Audio sent so far is in sync, now we can send the next chunk.\r
192                         result = consumer_->send(frame);\r
193                         boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
194                 }\r
195                 else\r
196                         CASPAR_LOG(trace) << print() << L" Syncing audio.";\r
197 \r
198                 sync_buffer_.push_back(static_cast<int>(frame->audio_data().size()));\r
199                 \r
200                 return result;\r
201         }\r
202         \r
203         virtual std::wstring print() const override                                     {return consumer_->print();}\r
204         virtual boost::property_tree::wptree info() const override      {return consumer_->info();}\r
205         virtual bool has_synchronization_clock() const override         {return consumer_->has_synchronization_clock();}\r
206         virtual int buffer_depth() const override                                       {return consumer_->buffer_depth();}\r
207         virtual int index() const override                                                      {return consumer_->index();}\r
208 };\r
209 \r
210 spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
211 {\r
212         if(params.empty())\r
213                 BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info(""));\r
214         \r
215         auto consumer = frame_consumer::empty();\r
216         std::any_of(g_factories.begin(), g_factories.end(), [&](const consumer_factory_t& factory) -> bool\r
217                 {\r
218                         try\r
219                         {\r
220                                 consumer = factory(params);\r
221                         }\r
222                         catch(...)\r
223                         {\r
224                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
225                         }\r
226                         return consumer != frame_consumer::empty();\r
227                 });\r
228 \r
229         if(consumer == frame_consumer::empty())\r
230                 BOOST_THROW_EXCEPTION(file_not_found() << msg_info("No match found for supplied commands. Check syntax."));\r
231 \r
232         return spl::make_shared<destroy_consumer_proxy>(\r
233                         spl::make_shared<print_consumer_proxy>(\r
234                          spl::make_shared<recover_consumer_proxy>(\r
235                           spl::make_shared<cadence_guard>(\r
236                            std::move(consumer)))));\r
237 }\r
238 \r
239 const spl::shared_ptr<frame_consumer>& frame_consumer::empty()\r
240 {\r
241         struct empty_frame_consumer : public frame_consumer\r
242         {\r
243                 virtual bool send(const spl::shared_ptr<const data_frame>&) override {return false;}\r
244                 virtual void initialize(const video_format_desc&, int) override{}\r
245                 virtual std::wstring print() const override {return L"empty";}\r
246                 virtual bool has_synchronization_clock() const override {return false;}\r
247                 virtual int buffer_depth() const override {return 0;};\r
248                 virtual int index() const{return -1;}\r
249                 virtual boost::property_tree::wptree info() const override\r
250                 {\r
251                         boost::property_tree::wptree info;\r
252                         info.add(L"type", L"empty-consumer");\r
253                         return info;\r
254                 }\r
255         };\r
256         static spl::shared_ptr<frame_consumer> consumer = spl::make_shared<empty_frame_consumer>();\r
257         return consumer;\r
258 }\r
259 \r
260 }}