2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
22 #include "../../StdAfx.h"
24 #include "synchronizing_consumer.h"
26 #include <common/log/log.h>
27 #include <common/diagnostics/graph.h>
28 #include <common/concurrency/future_util.h>
30 #include <core/video_format.h>
32 #include <boost/range/adaptor/transformed.hpp>
33 #include <boost/range/algorithm/min_element.hpp>
34 #include <boost/range/algorithm/max_element.hpp>
35 #include <boost/range/algorithm/for_each.hpp>
36 #include <boost/range/algorithm/count_if.hpp>
37 #include <boost/range/numeric.hpp>
38 #include <boost/algorithm/string/join.hpp>
39 #include <boost/thread/future.hpp>
46 #include <tbb/atomic.h>
48 namespace caspar { namespace core {
50 using namespace boost::adaptors;
52 class delegating_frame_consumer : public frame_consumer
54 safe_ptr<frame_consumer> consumer_;
56 delegating_frame_consumer(const safe_ptr<frame_consumer>& consumer)
61 frame_consumer& get_delegate()
66 const frame_consumer& get_delegate() const
71 virtual void initialize(
72 const video_format_desc& format_desc, int channel_index) override
74 get_delegate().initialize(format_desc, channel_index);
77 virtual int64_t presentation_frame_age_millis() const
79 return get_delegate().presentation_frame_age_millis();
82 virtual boost::unique_future<bool> send(
83 const safe_ptr<read_frame>& frame) override
85 return get_delegate().send(frame);
88 virtual std::wstring print() const override
90 return get_delegate().print();
93 virtual boost::property_tree::wptree info() const override
95 return get_delegate().info();
98 virtual bool has_synchronization_clock() const override
100 return get_delegate().has_synchronization_clock();
103 virtual size_t buffer_depth() const override
105 return get_delegate().buffer_depth();
108 virtual int index() const override
110 return get_delegate().index();
114 const std::vector<int>& diag_colors()
116 static std::vector<int> colors = boost::assign::list_of<int>
117 (diagnostics::color(0.0f, 0.6f, 0.9f))
118 (diagnostics::color(0.6f, 0.3f, 0.3f))
119 (diagnostics::color(0.3f, 0.6f, 0.3f))
120 (diagnostics::color(0.4f, 0.3f, 0.8f))
121 (diagnostics::color(0.9f, 0.9f, 0.5f))
122 (diagnostics::color(0.2f, 0.9f, 0.9f));
127 class buffering_consumer_adapter : public delegating_frame_consumer
129 std::queue<safe_ptr<read_frame>> buffer_;
130 tbb::atomic<size_t> buffered_;
131 tbb::atomic<int64_t> duplicate_next_;
133 buffering_consumer_adapter(const safe_ptr<frame_consumer>& consumer)
134 : delegating_frame_consumer(consumer)
140 boost::unique_future<bool> consume_one()
142 if (!buffer_.empty())
148 return get_delegate().send(buffer_.front());
151 virtual boost::unique_future<bool> send(
152 const safe_ptr<read_frame>& frame) override
158 else if (!buffer_.empty())
167 return get_delegate().send(buffer_.front());
170 void duplicate_next(int64_t to_duplicate)
172 duplicate_next_ += to_duplicate;
175 size_t num_buffered() const
177 return buffered_ - 1;
180 virtual std::wstring print() const override
182 return L"buffering[" + get_delegate().print() + L"]";
185 virtual boost::property_tree::wptree info() const override
187 boost::property_tree::wptree info;
189 info.add(L"type", L"buffering-consumer-adapter");
190 info.add_child(L"consumer", get_delegate().info());
191 info.add(L"buffered-frames", num_buffered());
197 static const uint64_t MAX_BUFFERED_OUT_OF_MEMORY_GUARD = 5;
199 struct synchronizing_consumer::implementation
202 std::vector<safe_ptr<buffering_consumer_adapter>> consumers_;
203 size_t buffer_depth_;
204 bool has_synchronization_clock_;
205 std::vector<boost::unique_future<bool>> results_;
206 boost::promise<bool> promise_;
207 video_format_desc format_desc_;
208 safe_ptr<diagnostics::graph> graph_;
209 int64_t grace_period_;
210 tbb::atomic<int64_t> current_diff_;
212 implementation(const std::vector<safe_ptr<frame_consumer>>& consumers)
215 BOOST_FOREACH(auto& consumer, consumers)
216 consumers_.push_back(make_safe<buffering_consumer_adapter>(consumer));
219 auto buffer_depths = consumers | transformed(std::mem_fn(&frame_consumer::buffer_depth));
220 std::vector<size_t> depths(buffer_depths.begin(), buffer_depths.end());
221 buffer_depth_ = *boost::max_element(depths);
222 has_synchronization_clock_ = boost::count_if(consumers, std::mem_fn(&frame_consumer::has_synchronization_clock)) > 0;
224 diagnostics::register_graph(graph_);
227 boost::unique_future<bool> send(const safe_ptr<read_frame>& frame)
231 BOOST_FOREACH(auto& consumer, consumers_)
232 results_.push_back(consumer->send(frame));
234 promise_ = boost::promise<bool>();
235 promise_.set_wait_callback(std::function<void(boost::promise<bool>&)>([this](boost::promise<bool>& promise)
237 BOOST_FOREACH(auto& result, results_)
242 auto frame_ages = consumers_ | transformed(std::mem_fn(&frame_consumer::presentation_frame_age_millis));
243 std::vector<int64_t> ages(frame_ages.begin(), frame_ages.end());
244 auto max_age_iter = boost::max_element(ages);
245 auto min_age_iter = boost::min_element(ages);
246 int64_t min_age = *min_age_iter;
250 // One of the consumers have yet no measurement, wait until next
251 // frame until we make any assumptions.
252 promise.set_value(true);
256 int64_t max_age = *max_age_iter;
257 int64_t age_diff = max_age - min_age;
259 current_diff_ = age_diff;
261 for (unsigned i = 0; i < ages.size(); ++i)
263 narrow(consumers_[i]->print()),
264 static_cast<double>(ages[i]) / *max_age_iter);
266 bool grace_period_over = grace_period_ == 1;
271 if (grace_period_ == 0)
273 int64_t frame_duration = static_cast<int64_t>(1000 / format_desc_.fps);
275 if (age_diff >= frame_duration)
277 CASPAR_LOG(info) << print() << L" Consumers not in sync. min: " << min_age << L" max: " << max_age;
279 auto index = min_age_iter - ages.begin();
280 auto to_duplicate = age_diff / frame_duration;
281 auto& consumer = *consumers_.at(index);
283 auto currently_buffered = consumer.num_buffered();
285 if (currently_buffered + to_duplicate > MAX_BUFFERED_OUT_OF_MEMORY_GUARD)
287 CASPAR_LOG(info) << print() << L" Protecting from out of memory. Duplicating less frames than calculated";
289 to_duplicate = MAX_BUFFERED_OUT_OF_MEMORY_GUARD - currently_buffered;
292 consumer.duplicate_next(to_duplicate);
294 grace_period_ = 10 + to_duplicate + buffer_depth_;
296 else if (grace_period_over)
298 CASPAR_LOG(info) << print() << L" Consumers resynced. min: " << min_age << L" max: " << max_age;
302 blocking_consume_unnecessarily_buffered();
304 promise.set_value(true);
307 return promise_.get_future();
310 void blocking_consume_unnecessarily_buffered()
312 auto buffered = consumers_ | transformed(std::mem_fn(&buffering_consumer_adapter::num_buffered));
313 std::vector<size_t> num_buffered(buffered.begin(), buffered.end());
314 auto min_buffered = *boost::min_element(num_buffered);
317 CASPAR_LOG(info) << print() << L" " << min_buffered
318 << L" frames unnecessarily buffered. Consuming and letting channel pause during that time.";
322 std::vector<boost::unique_future<bool>> results;
324 BOOST_FOREACH(auto& consumer, consumers_)
325 results.push_back(consumer->consume_one());
327 BOOST_FOREACH(auto& result, results)
334 void initialize(const video_format_desc& format_desc, int channel_index)
336 for (size_t i = 0; i < consumers_.size(); ++i)
338 auto& consumer = consumers_.at(i);
339 consumer->initialize(format_desc, channel_index);
341 narrow(consumer->print()),
342 diag_colors().at(i % diag_colors().size()));
345 graph_->set_text(print());
346 format_desc_ = format_desc;
349 int64_t presentation_frame_age_millis() const
353 BOOST_FOREACH(auto& consumer, consumers_)
354 result = std::max(result, consumer->presentation_frame_age_millis());
359 std::wstring print() const
361 return L"synchronized[" + boost::algorithm::join(consumers_ | transformed(std::mem_fn(&frame_consumer::print)), L"|") + L"]";
364 boost::property_tree::wptree info() const
366 boost::property_tree::wptree info;
368 info.add(L"type", L"synchronized-consumer");
370 BOOST_FOREACH(auto& consumer, consumers_)
371 info.add_child(L"consumer", consumer->info());
373 info.add(L"age-diff", current_diff_);
378 bool has_synchronization_clock() const
380 return has_synchronization_clock_;
383 size_t buffer_depth() const
385 return buffer_depth_;
390 return boost::accumulate(consumers_ | transformed(std::mem_fn(&frame_consumer::index)), 10000);
394 synchronizing_consumer::synchronizing_consumer(const std::vector<safe_ptr<frame_consumer>>& consumers)
395 : impl_(new implementation(consumers))
399 boost::unique_future<bool> synchronizing_consumer::send(const safe_ptr<read_frame>& frame)
401 return impl_->send(frame);
404 void synchronizing_consumer::initialize(const video_format_desc& format_desc, int channel_index)
406 impl_->initialize(format_desc, channel_index);
409 int64_t synchronizing_consumer::presentation_frame_age_millis() const
411 return impl_->presentation_frame_age_millis();
414 std::wstring synchronizing_consumer::print() const
416 return impl_->print();
419 boost::property_tree::wptree synchronizing_consumer::info() const
421 return impl_->info();
424 bool synchronizing_consumer::has_synchronization_clock() const
426 return impl_->has_synchronization_clock();
429 size_t synchronizing_consumer::buffer_depth() const
431 return impl_->buffer_depth();
434 int synchronizing_consumer::index() const
436 return impl_->index();