2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
22 #include "../../StdAfx.h"
24 #include "synchronizing_consumer.h"
26 #include <common/log/log.h>
27 #include <common/diagnostics/graph.h>
28 #include <common/concurrency/future_util.h>
30 #include <core/video_format.h>
32 #include <boost/range/adaptor/transformed.hpp>
33 #include <boost/range/algorithm/min_element.hpp>
34 #include <boost/range/algorithm/max_element.hpp>
35 #include <boost/range/algorithm/for_each.hpp>
36 #include <boost/range/algorithm/count_if.hpp>
37 #include <boost/range/numeric.hpp>
38 #include <boost/algorithm/string/join.hpp>
39 #include <boost/thread/future.hpp>
46 #include <tbb/atomic.h>
48 namespace caspar { namespace core {
50 using namespace boost::adaptors;
52 class delegating_frame_consumer : public frame_consumer
54 safe_ptr<frame_consumer> consumer_;
56 delegating_frame_consumer(const safe_ptr<frame_consumer>& consumer)
61 frame_consumer& get_delegate()
66 const frame_consumer& get_delegate() const
71 virtual void initialize(
72 const video_format_desc& format_desc, int channel_index) override
74 get_delegate().initialize(format_desc, channel_index);
77 virtual int64_t presentation_frame_age_millis() const
79 return get_delegate().presentation_frame_age_millis();
82 virtual boost::unique_future<bool> send(
83 const safe_ptr<read_frame>& frame) override
85 return get_delegate().send(frame);
88 virtual std::wstring print() const override
90 return get_delegate().print();
93 virtual boost::property_tree::wptree info() const override
95 return get_delegate().info();
98 virtual bool has_synchronization_clock() const override
100 return get_delegate().has_synchronization_clock();
103 virtual size_t buffer_depth() const override
105 return get_delegate().buffer_depth();
108 virtual int index() const override
110 return get_delegate().index();
114 class buffering_consumer_adapter : public delegating_frame_consumer
116 std::queue<safe_ptr<read_frame>> buffer_;
117 tbb::atomic<size_t> buffered_;
118 tbb::atomic<int64_t> duplicate_next_;
120 buffering_consumer_adapter(const safe_ptr<frame_consumer>& consumer)
121 : delegating_frame_consumer(consumer)
127 boost::unique_future<bool> consume_one()
129 if (!buffer_.empty())
135 return get_delegate().send(buffer_.front());
138 virtual boost::unique_future<bool> send(
139 const safe_ptr<read_frame>& frame) override
145 else if (!buffer_.empty())
154 return get_delegate().send(buffer_.front());
157 void duplicate_next(int64_t to_duplicate)
159 duplicate_next_ += to_duplicate;
162 size_t num_buffered() const
164 return buffered_ - 1;
167 virtual std::wstring print() const override
169 return L"buffering[" + get_delegate().print() + L"]";
172 virtual boost::property_tree::wptree info() const override
174 boost::property_tree::wptree info;
176 info.add(L"type", L"buffering-consumer-adapter");
177 info.add_child(L"consumer", get_delegate().info());
178 info.add(L"buffered-frames", num_buffered());
184 static const uint64_t MAX_BUFFERED_OUT_OF_MEMORY_GUARD = 5;
186 struct synchronizing_consumer::implementation
189 std::vector<safe_ptr<buffering_consumer_adapter>> consumers_;
190 size_t buffer_depth_;
191 bool has_synchronization_clock_;
192 std::vector<boost::unique_future<bool>> results_;
193 boost::promise<bool> promise_;
194 video_format_desc format_desc_;
195 safe_ptr<diagnostics::graph> graph_;
196 int64_t grace_period_;
197 tbb::atomic<int64_t> current_diff_;
199 implementation(const std::vector<safe_ptr<frame_consumer>>& consumers)
202 BOOST_FOREACH(auto& consumer, consumers)
203 consumers_.push_back(make_safe<buffering_consumer_adapter>(consumer));
206 auto buffer_depths = consumers | transformed(std::mem_fn(&frame_consumer::buffer_depth));
207 std::vector<size_t> depths(buffer_depths.begin(), buffer_depths.end());
208 buffer_depth_ = *boost::max_element(depths);
209 has_synchronization_clock_ = boost::count_if(consumers, std::mem_fn(&frame_consumer::has_synchronization_clock)) > 0;
211 graph_->set_text(print());
212 diagnostics::register_graph(graph_);
215 boost::unique_future<bool> send(const safe_ptr<read_frame>& frame)
219 BOOST_FOREACH(auto& consumer, consumers_)
220 results_.push_back(consumer->send(frame));
222 promise_ = boost::promise<bool>();
223 promise_.set_wait_callback(std::function<void(boost::promise<bool>&)>([this](boost::promise<bool>& promise)
225 BOOST_FOREACH(auto& result, results_)
230 auto frame_ages = consumers_ | transformed(std::mem_fn(&frame_consumer::presentation_frame_age_millis));
231 std::vector<int64_t> ages(frame_ages.begin(), frame_ages.end());
232 auto max_age_iter = boost::max_element(ages);
233 auto min_age_iter = boost::min_element(ages);
234 int64_t min_age = *min_age_iter;
238 // One of the consumers have yet no measurement, wait until next
239 // frame until we make any assumptions.
240 promise.set_value(true);
244 int64_t max_age = *max_age_iter;
245 int64_t age_diff = max_age - min_age;
247 current_diff_ = age_diff;
249 for (unsigned i = 0; i < ages.size(); ++i)
250 graph_->set_value(narrow(consumers_[i]->print()), static_cast<double>(ages[i]) / *max_age_iter);
252 bool grace_period_over = grace_period_ == 1;
257 if (grace_period_ == 0)
259 int64_t frame_duration = static_cast<int64_t>(1000 / format_desc_.fps);
261 if (age_diff >= frame_duration)
263 CASPAR_LOG(info) << print() << L" Consumers not in sync. min: " << min_age << L" max: " << max_age;
265 auto index = min_age_iter - ages.begin();
266 auto to_duplicate = age_diff / frame_duration;
267 auto& consumer = *consumers_.at(index);
269 auto currently_buffered = consumer.num_buffered();
271 if (currently_buffered + to_duplicate > MAX_BUFFERED_OUT_OF_MEMORY_GUARD)
273 CASPAR_LOG(info) << print() << L" Protecting from out of memory. Duplicating less frames than calculated";
275 to_duplicate = MAX_BUFFERED_OUT_OF_MEMORY_GUARD - currently_buffered;
278 consumer.duplicate_next(to_duplicate);
280 grace_period_ = 10 + to_duplicate + buffer_depth_;
282 else if (grace_period_over)
284 CASPAR_LOG(info) << print() << L" Consumers resynced. min: " << min_age << L" max: " << max_age;
288 blocking_consume_unnecessarily_buffered();
290 promise.set_value(true);
293 return promise_.get_future();
296 void blocking_consume_unnecessarily_buffered()
298 auto buffered = consumers_ | transformed(std::mem_fn(&buffering_consumer_adapter::num_buffered));
299 std::vector<size_t> num_buffered(buffered.begin(), buffered.end());
300 auto min_buffered = *boost::min_element(num_buffered);
303 CASPAR_LOG(info) << print() << L" " << min_buffered
304 << L" frames unnecessarily buffered. Consuming and letting channel pause during that time.";
308 std::vector<boost::unique_future<bool>> results;
310 BOOST_FOREACH(auto& consumer, consumers_)
311 results.push_back(consumer->consume_one());
313 BOOST_FOREACH(auto& result, results)
320 void initialize(const video_format_desc& format_desc, int channel_index)
324 [&] (const safe_ptr<frame_consumer>& consumer)
326 consumer->initialize(format_desc, channel_index);
328 format_desc_ = format_desc;
331 int64_t presentation_frame_age_millis() const
335 BOOST_FOREACH(auto& consumer, consumers_)
336 result = std::max(result, consumer->presentation_frame_age_millis());
341 std::wstring print() const
343 return L"synchronized[" + boost::algorithm::join(consumers_ | transformed(std::mem_fn(&frame_consumer::print)), L"|") + L"]";
346 boost::property_tree::wptree info() const
348 boost::property_tree::wptree info;
350 info.add(L"type", L"synchronized-consumer");
352 BOOST_FOREACH(auto& consumer, consumers_)
353 info.add_child(L"consumer", consumer->info());
355 info.add(L"age-diff", current_diff_);
360 bool has_synchronization_clock() const
362 return has_synchronization_clock_;
365 size_t buffer_depth() const
367 return buffer_depth_;
372 return boost::accumulate(consumers_ | transformed(std::mem_fn(&frame_consumer::index)), 10000);
376 synchronizing_consumer::synchronizing_consumer(const std::vector<safe_ptr<frame_consumer>>& consumers)
377 : impl_(new implementation(consumers))
381 boost::unique_future<bool> synchronizing_consumer::send(const safe_ptr<read_frame>& frame)
383 return impl_->send(frame);
386 void synchronizing_consumer::initialize(const video_format_desc& format_desc, int channel_index)
388 impl_->initialize(format_desc, channel_index);
391 int64_t synchronizing_consumer::presentation_frame_age_millis() const
393 return impl_->presentation_frame_age_millis();
396 std::wstring synchronizing_consumer::print() const
398 return impl_->print();
401 boost::property_tree::wptree synchronizing_consumer::info() const
403 return impl_->info();
406 bool synchronizing_consumer::has_synchronization_clock() const
408 return impl_->has_synchronization_clock();
411 size_t synchronizing_consumer::buffer_depth() const
413 return impl_->buffer_depth();
416 int synchronizing_consumer::index() const
418 return impl_->index();