]> git.sesse.net Git - casparcg/blob - core/consumer/synchronizing/synchronizing_consumer.cpp
d19f3e975e87fd614e7cb6d1223e27e0eb992d8c
[casparcg] / core / consumer / synchronizing / synchronizing_consumer.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "../../StdAfx.h"
23
24 #include "synchronizing_consumer.h"
25
26 #include <common/log/log.h>
27 #include <common/diagnostics/graph.h>
28 #include <common/concurrency/future_util.h>
29
30 #include <core/video_format.h>
31
32 #include <boost/range/adaptor/transformed.hpp>
33 #include <boost/range/algorithm/min_element.hpp>
34 #include <boost/range/algorithm/max_element.hpp>
35 #include <boost/range/algorithm/for_each.hpp>
36 #include <boost/range/algorithm/count_if.hpp>
37 #include <boost/range/numeric.hpp>
38 #include <boost/algorithm/string/join.hpp>
39 #include <boost/thread/future.hpp>
40
41 #include <functional>
42 #include <vector>
43 #include <queue>
44 #include <utility>
45
46 #include <tbb/atomic.h>
47
48 namespace caspar { namespace core {
49
50 using namespace boost::adaptors;
51
52 class delegating_frame_consumer : public frame_consumer
53 {
54         safe_ptr<frame_consumer> consumer_;
55 public:
56         delegating_frame_consumer(const safe_ptr<frame_consumer>& consumer)
57                 : consumer_(consumer)
58         {
59         }
60
61         frame_consumer& get_delegate()
62         {
63                 return *consumer_;
64         }
65
66         const frame_consumer& get_delegate() const
67         {
68                 return *consumer_;
69         }
70         
71         virtual void initialize(
72                         const video_format_desc& format_desc, int channel_index) override
73         {
74                 get_delegate().initialize(format_desc, channel_index);
75         }
76
77         virtual int64_t presentation_frame_age_millis() const
78         {
79                 return get_delegate().presentation_frame_age_millis();
80         }
81
82         virtual boost::unique_future<bool> send(
83                         const safe_ptr<read_frame>& frame) override
84         {               
85                 return get_delegate().send(frame);
86         }
87
88         virtual std::wstring print() const override
89         {
90                 return get_delegate().print();
91         }
92
93         virtual boost::property_tree::wptree info() const override
94         {
95                 return get_delegate().info();
96         }
97
98         virtual bool has_synchronization_clock() const override
99         {
100                 return get_delegate().has_synchronization_clock();
101         }
102
103         virtual size_t buffer_depth() const override
104         {
105                 return get_delegate().buffer_depth();
106         }
107
108         virtual int index() const override
109         {
110                 return get_delegate().index();
111         }
112 };
113
114 class buffering_consumer_adapter : public delegating_frame_consumer
115 {
116         std::queue<safe_ptr<read_frame>>        buffer_;
117         tbb::atomic<size_t>                                     buffered_;
118         tbb::atomic<int64_t>                            duplicate_next_;
119 public:
120         buffering_consumer_adapter(const safe_ptr<frame_consumer>& consumer)
121                 : delegating_frame_consumer(consumer)
122         {
123                 buffered_ = 0;
124                 duplicate_next_ = 0;
125         }
126
127         boost::unique_future<bool> consume_one()
128         {
129                 if (!buffer_.empty())
130                 {
131                         buffer_.pop();
132                         --buffered_;
133                 }
134
135                 return get_delegate().send(buffer_.front());
136         }
137
138         virtual boost::unique_future<bool> send(
139                         const safe_ptr<read_frame>& frame) override
140         {
141                 if (duplicate_next_)
142                 {
143                         --duplicate_next_;
144                 }
145                 else if (!buffer_.empty())
146                 {
147                         buffer_.pop();
148                         --buffered_;
149                 }
150
151                 buffer_.push(frame);
152                 ++buffered_;
153
154                 return get_delegate().send(buffer_.front());
155         }
156
157         void duplicate_next(int64_t to_duplicate)
158         {
159                 duplicate_next_ += to_duplicate;
160         }
161
162         size_t num_buffered() const
163         {
164                 return buffered_ - 1;
165         }
166
167         virtual std::wstring print() const override
168         {
169                 return L"buffering[" + get_delegate().print() + L"]";
170         }
171
172         virtual boost::property_tree::wptree info() const override
173         {
174                 boost::property_tree::wptree info;
175
176                 info.add(L"type", L"buffering-consumer-adapter");
177                 info.add_child(L"consumer", get_delegate().info());
178                 info.add(L"buffered-frames", num_buffered());
179
180                 return info;
181         }
182 };
183
184 static const uint64_t MAX_BUFFERED_OUT_OF_MEMORY_GUARD = 5;
185
186 struct synchronizing_consumer::implementation
187 {
188 private:
189         std::vector<safe_ptr<buffering_consumer_adapter>>       consumers_;
190         size_t                                                                                          buffer_depth_;
191         bool                                                                                            has_synchronization_clock_;
192         std::vector<boost::unique_future<bool>>                         results_;
193         boost::promise<bool>                                                            promise_;
194         video_format_desc                                                                       format_desc_;
195         safe_ptr<diagnostics::graph>                                            graph_;
196         int64_t                                                                                         grace_period_;
197         tbb::atomic<int64_t>                                                            current_diff_;
198 public:
199         implementation(const std::vector<safe_ptr<frame_consumer>>& consumers)
200                 : grace_period_(0)
201         {
202                 BOOST_FOREACH(auto& consumer, consumers)
203                         consumers_.push_back(make_safe<buffering_consumer_adapter>(consumer));
204
205                 current_diff_ = 0;
206                 auto buffer_depths = consumers | transformed(std::mem_fn(&frame_consumer::buffer_depth));
207                 std::vector<size_t> depths(buffer_depths.begin(), buffer_depths.end());
208                 buffer_depth_ = *boost::max_element(depths);
209                 has_synchronization_clock_ = boost::count_if(consumers, std::mem_fn(&frame_consumer::has_synchronization_clock)) > 0;
210
211                 graph_->set_text(print());
212                 diagnostics::register_graph(graph_);
213         }
214
215         boost::unique_future<bool> send(const safe_ptr<read_frame>& frame)
216         {
217                 results_.clear();
218
219                 BOOST_FOREACH(auto& consumer, consumers_)
220                         results_.push_back(consumer->send(frame));
221
222                 promise_ = boost::promise<bool>();
223                 promise_.set_wait_callback(std::function<void(boost::promise<bool>&)>([this](boost::promise<bool>& promise)
224                 {
225                         BOOST_FOREACH(auto& result, results_)
226                         {
227                                 result.get();
228                         }
229
230                         auto frame_ages = consumers_ | transformed(std::mem_fn(&frame_consumer::presentation_frame_age_millis));
231                         std::vector<int64_t> ages(frame_ages.begin(), frame_ages.end());
232                         auto max_age_iter = boost::max_element(ages);
233                         auto min_age_iter = boost::min_element(ages);
234                         int64_t min_age = *min_age_iter;
235                         
236                         if (min_age == 0)
237                         {
238                                 // One of the consumers have yet no measurement, wait until next 
239                                 // frame until we make any assumptions.
240                                 promise.set_value(true);
241                                 return;
242                         }
243
244                         int64_t max_age = *max_age_iter;
245                         int64_t age_diff = max_age - min_age;
246
247                         current_diff_ = age_diff;
248
249                         for (unsigned i = 0; i < ages.size(); ++i)
250                                 graph_->set_value(narrow(consumers_[i]->print()), static_cast<double>(ages[i]) / *max_age_iter);
251
252                         bool grace_period_over = grace_period_ == 1;
253
254                         if (grace_period_)
255                                 --grace_period_;
256
257                         if (grace_period_ == 0)
258                         {
259                                 int64_t frame_duration = static_cast<int64_t>(1000 / format_desc_.fps);
260
261                                 if (age_diff >= frame_duration)
262                                 {
263                                         CASPAR_LOG(info) << print() << L" Consumers not in sync. min: " << min_age << L" max: " << max_age;
264
265                                         auto index = min_age_iter - ages.begin();
266                                         auto to_duplicate = age_diff / frame_duration;
267                                         auto& consumer = *consumers_.at(index);
268
269                                         auto currently_buffered = consumer.num_buffered();
270
271                                         if (currently_buffered + to_duplicate > MAX_BUFFERED_OUT_OF_MEMORY_GUARD)
272                                         {
273                                                 CASPAR_LOG(info) << print() << L" Protecting from out of memory. Duplicating less frames than calculated";
274
275                                                 to_duplicate = MAX_BUFFERED_OUT_OF_MEMORY_GUARD - currently_buffered;
276                                         }
277
278                                         consumer.duplicate_next(to_duplicate);
279
280                                         grace_period_ = 10 + to_duplicate + buffer_depth_;
281                                 }
282                                 else if (grace_period_over)
283                                 {
284                                         CASPAR_LOG(info) << print() << L" Consumers resynced. min: " << min_age << L" max: " << max_age;
285                                 }
286                         }
287
288                         blocking_consume_unnecessarily_buffered();
289
290                         promise.set_value(true);
291                 }));
292
293                 return promise_.get_future();
294         }
295
296         void blocking_consume_unnecessarily_buffered()
297         {
298                 auto buffered = consumers_ | transformed(std::mem_fn(&buffering_consumer_adapter::num_buffered));
299                 std::vector<size_t> num_buffered(buffered.begin(), buffered.end());
300                 auto min_buffered = *boost::min_element(num_buffered);
301
302                 if (min_buffered)
303                         CASPAR_LOG(info) << print() << L" " << min_buffered 
304                                         << L" frames unnecessarily buffered. Consuming and letting channel pause during that time.";
305
306                 while (min_buffered)
307                 {
308                         std::vector<boost::unique_future<bool>> results;
309
310                         BOOST_FOREACH(auto& consumer, consumers_)
311                                 results.push_back(consumer->consume_one());
312
313                         BOOST_FOREACH(auto& result, results)
314                                 result.get();
315
316                         --min_buffered;
317                 }
318         }
319
320         void initialize(const video_format_desc& format_desc, int channel_index)
321         {
322                 boost::for_each(
323                                 consumers_, 
324                                 [&] (const safe_ptr<frame_consumer>& consumer) 
325                                 {
326                                         consumer->initialize(format_desc, channel_index); 
327                                 });
328                 format_desc_ = format_desc;
329         }
330
331         int64_t presentation_frame_age_millis() const
332         {
333                 int64_t result = 0;
334
335                 BOOST_FOREACH(auto& consumer, consumers_)
336                         result = std::max(result, consumer->presentation_frame_age_millis());
337
338                 return result;
339         }
340
341         std::wstring print() const
342         {
343                 return L"synchronized[" + boost::algorithm::join(consumers_ | transformed(std::mem_fn(&frame_consumer::print)), L"|") +  L"]";
344         }
345
346         boost::property_tree::wptree info() const
347         {
348                 boost::property_tree::wptree info;
349
350                 info.add(L"type", L"synchronized-consumer");
351
352                 BOOST_FOREACH(auto& consumer, consumers_)
353                         info.add_child(L"consumer", consumer->info());
354
355                 info.add(L"age-diff", current_diff_);
356
357                 return info;
358         }
359
360         bool has_synchronization_clock() const
361         {
362                 return has_synchronization_clock_;
363         }
364
365         size_t buffer_depth() const
366         {
367                 return buffer_depth_;
368         }
369
370         int index() const
371         {
372                 return boost::accumulate(consumers_ | transformed(std::mem_fn(&frame_consumer::index)), 10000);
373         }
374 };
375
376 synchronizing_consumer::synchronizing_consumer(const std::vector<safe_ptr<frame_consumer>>& consumers)
377         : impl_(new implementation(consumers))
378 {
379 }
380
381 boost::unique_future<bool> synchronizing_consumer::send(const safe_ptr<read_frame>& frame)
382 {
383         return impl_->send(frame);
384 }
385
386 void synchronizing_consumer::initialize(const video_format_desc& format_desc, int channel_index)
387 {
388         impl_->initialize(format_desc, channel_index);
389 }
390
391 int64_t synchronizing_consumer::presentation_frame_age_millis() const
392 {
393         return impl_->presentation_frame_age_millis();
394 }
395
396 std::wstring synchronizing_consumer::print() const
397 {
398         return impl_->print();
399 }
400
401 boost::property_tree::wptree synchronizing_consumer::info() const
402 {
403         return impl_->info();
404 }
405
406 bool synchronizing_consumer::has_synchronization_clock() const
407 {
408         return impl_->has_synchronization_clock();
409 }
410
411 size_t synchronizing_consumer::buffer_depth() const
412 {
413         return impl_->buffer_depth();
414 }
415
416 int synchronizing_consumer::index() const
417 {
418         return impl_->index();
419 }
420
421 }}