]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer.cpp
2.1.0: Refactored frame_consumer exception handling and recovery,
[casparcg] / core / consumer / frame_consumer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Robert Nagy, ronag89@gmail.com\r
20 */\r
21 \r
22 #include "../StdAfx.h"\r
23 \r
24 #include "frame_consumer.h"\r
25 \r
26 #include <common/except.h>\r
27 \r
28 #include <core/video_format.h>\r
29 #include <core/frame/data_frame.h>\r
30 \r
31 #include <common/concurrency/async.h>\r
32 \r
33 namespace caspar { namespace core {\r
34                 \r
35 std::vector<const consumer_factory_t> g_factories;\r
36 \r
37 void register_consumer_factory(const consumer_factory_t& factory)\r
38 {\r
39         g_factories.push_back(factory);\r
40 }\r
41 \r
42 class destroy_consumer_proxy : public frame_consumer\r
43 {       \r
44         std::unique_ptr<std::shared_ptr<frame_consumer>> consumer_;\r
45 public:\r
46         destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
47                 : consumer_(new std::shared_ptr<frame_consumer>(std::move(consumer)))\r
48         {\r
49         }\r
50 \r
51         ~destroy_consumer_proxy()\r
52         {               \r
53                 static tbb::atomic<int> counter = tbb::atomic<int>();\r
54                         \r
55                 ++counter;\r
56                 CASPAR_VERIFY(counter < 32);\r
57                 \r
58                 auto consumer = consumer_.release();\r
59                 async([=]\r
60                 {\r
61                         std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);\r
62 \r
63                         auto str = (*consumer)->print();\r
64                         try\r
65                         {\r
66                                 if(!consumer->unique())\r
67                                         CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();\r
68                                 else\r
69                                         CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";\r
70                         }\r
71                         catch(...){}\r
72 \r
73                         pointer_guard.reset();\r
74 \r
75                         --counter;\r
76                 }); \r
77         }\r
78         \r
79         virtual bool send(const spl::shared_ptr<const struct data_frame>& frame) override                                       {return (*consumer_)->send(frame);}\r
80         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return (*consumer_)->initialize(format_desc, channel_index);}\r
81         virtual std::wstring print() const override                                                                                                                     {return (*consumer_)->print();}\r
82         virtual boost::property_tree::wptree info() const override                                                                                      {return (*consumer_)->info();}\r
83         virtual bool has_synchronization_clock() const override                                                                                         {return (*consumer_)->has_synchronization_clock();}\r
84         virtual int buffer_depth() const override                                                                                                                       {return (*consumer_)->buffer_depth();}\r
85         virtual int index() const override                                                                                                                                      {return (*consumer_)->index();}\r
86 };\r
87 \r
88 class print_consumer_proxy : public frame_consumer\r
89 {       \r
90         std::shared_ptr<frame_consumer> consumer_;\r
91 public:\r
92         print_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
93                 : consumer_(std::move(consumer))\r
94         {\r
95                 CASPAR_LOG(info) << consumer_->print() << L" Initialized.";\r
96         }\r
97 \r
98         ~print_consumer_proxy()\r
99         {               \r
100                 auto str = consumer_->print();\r
101                 CASPAR_LOG(trace) << str << L" Uninitializing.";\r
102                 consumer_.reset();\r
103                 CASPAR_LOG(info) << str << L" Uninitialized.";\r
104         }\r
105         \r
106         virtual bool send(const spl::shared_ptr<const struct data_frame>& frame) override                                       {return consumer_->send(frame);}\r
107         virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}\r
108         virtual std::wstring print() const override                                                                                                                     {return consumer_->print();}\r
109         virtual boost::property_tree::wptree info() const override                                                                                      {return consumer_->info();}\r
110         virtual bool has_synchronization_clock() const override                                                                                         {return consumer_->has_synchronization_clock();}\r
111         virtual int buffer_depth() const override                                                                                                                       {return consumer_->buffer_depth();}\r
112         virtual int index() const override                                                                                                                                      {return consumer_->index();}\r
113 };\r
114 \r
115 // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.\r
116 class cadence_guard : public frame_consumer\r
117 {\r
118         spl::shared_ptr<frame_consumer>         consumer_;\r
119         std::vector<int>                                audio_cadence_;\r
120         boost::circular_buffer<int>     sync_buffer_;\r
121 public:\r
122         cadence_guard(const spl::shared_ptr<frame_consumer>& consumer)\r
123                 : consumer_(consumer)\r
124         {\r
125         }\r
126         \r
127         virtual void initialize(const video_format_desc& format_desc, int channel_index) override\r
128         {\r
129                 audio_cadence_  = format_desc.audio_cadence;\r
130                 sync_buffer_    = boost::circular_buffer<int>(format_desc.audio_cadence.size());\r
131                 consumer_->initialize(format_desc, channel_index);\r
132         }\r
133 \r
134         virtual bool send(const spl::shared_ptr<const data_frame>& frame) override\r
135         {               \r
136                 if(audio_cadence_.size() == 1)\r
137                         return consumer_->send(frame);\r
138 \r
139                 bool result = true;\r
140                 \r
141                 if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame->audio_data().size())) \r
142                 {       \r
143                         // Audio sent so far is in sync, now we can send the next chunk.\r
144                         result = consumer_->send(frame);\r
145                         boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
146                 }\r
147                 else\r
148                         CASPAR_LOG(trace) << print() << L" Syncing audio.";\r
149 \r
150                 sync_buffer_.push_back(static_cast<int>(frame->audio_data().size()));\r
151                 \r
152                 return result;\r
153         }\r
154         \r
155         virtual std::wstring print() const override                                     {return consumer_->print();}\r
156         virtual boost::property_tree::wptree info() const override      {return consumer_->info();}\r
157         virtual bool has_synchronization_clock() const override         {return consumer_->has_synchronization_clock();}\r
158         virtual int buffer_depth() const override                                       {return consumer_->buffer_depth();}\r
159         virtual int index() const override                                                      {return consumer_->index();}\r
160 };\r
161 \r
162 class recover_consumer_proxy : public frame_consumer\r
163 {       \r
164         std::shared_ptr<frame_consumer> consumer_;\r
165         int                                                             channel_index_;\r
166         video_format_desc                               format_desc_;\r
167 public:\r
168         recover_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
169                 : consumer_(std::move(consumer))\r
170         {\r
171         }\r
172         \r
173         virtual bool send(const spl::shared_ptr<const struct data_frame>& frame)                                        \r
174         {\r
175                 try\r
176                 {\r
177                         return consumer_->send(frame);\r
178                 }\r
179                 catch(...)\r
180                 {\r
181                         CASPAR_LOG_CURRENT_EXCEPTION();\r
182                         try\r
183                         {\r
184                                 consumer_->initialize(format_desc_, channel_index_);\r
185                                 return consumer_->send(frame);\r
186                         }\r
187                         catch(...)\r
188                         {\r
189                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
190                                 CASPAR_LOG(error) << print() << "Failed to recover consumer.";\r
191                                 return false;\r
192                         }\r
193                 }\r
194         }\r
195 \r
196         virtual void initialize(const struct video_format_desc& format_desc, int channel_index)         \r
197         {\r
198                 format_desc_    = format_desc;\r
199                 channel_index_  = channel_index;\r
200                 return consumer_->initialize(format_desc, channel_index);\r
201         }\r
202 \r
203         virtual std::wstring print() const override                                     {return consumer_->print();}\r
204         virtual boost::property_tree::wptree info() const override      {return consumer_->info();}\r
205         virtual bool has_synchronization_clock() const override         {return consumer_->has_synchronization_clock();}\r
206         virtual int buffer_depth() const override                                       {return consumer_->buffer_depth();}\r
207         virtual int index() const override                                                      {return consumer_->index();}\r
208 };\r
209 \r
210 spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
211 {\r
212         if(params.empty())\r
213                 BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info(""));\r
214         \r
215         auto consumer = frame_consumer::empty();\r
216         std::any_of(g_factories.begin(), g_factories.end(), [&](const consumer_factory_t& factory) -> bool\r
217                 {\r
218                         try\r
219                         {\r
220                                 consumer = factory(params);\r
221                         }\r
222                         catch(...)\r
223                         {\r
224                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
225                         }\r
226                         return consumer != frame_consumer::empty();\r
227                 });\r
228 \r
229         if(consumer == frame_consumer::empty())\r
230                 BOOST_THROW_EXCEPTION(file_not_found() << msg_info("No match found for supplied commands. Check syntax."));\r
231 \r
232         return spl::make_shared<destroy_consumer_proxy>(\r
233                         spl::make_shared<print_consumer_proxy>(\r
234                          spl::make_shared<recover_consumer_proxy>(\r
235                           spl::make_shared<cadence_guard>(\r
236                            std::move(consumer)))));\r
237 }\r
238 \r
239 const spl::shared_ptr<frame_consumer>& frame_consumer::empty()\r
240 {\r
241         struct empty_frame_consumer : public frame_consumer\r
242         {\r
243                 virtual bool send(const spl::shared_ptr<const data_frame>&) override {return false;}\r
244                 virtual void initialize(const video_format_desc&, int) override{}\r
245                 virtual std::wstring print() const override {return L"empty";}\r
246                 virtual bool has_synchronization_clock() const override {return false;}\r
247                 virtual int buffer_depth() const override {return 0;};\r
248                 virtual int index() const{return -1;}\r
249                 virtual boost::property_tree::wptree info() const override\r
250                 {\r
251                         boost::property_tree::wptree info;\r
252                         info.add(L"type", L"empty-consumer");\r
253                         return info;\r
254                 }\r
255         };\r
256         static spl::shared_ptr<frame_consumer> consumer = spl::make_shared<empty_frame_consumer>();\r
257         return consumer;\r
258 }\r
259 \r
260 }}