]> git.sesse.net Git - casparcg/blob - core/consumer/synchronizing/synchronizing_consumer.cpp
Colorized diagnostics of synchronizing consumer
[casparcg] / core / consumer / synchronizing / synchronizing_consumer.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "../../StdAfx.h"
23
24 #include "synchronizing_consumer.h"
25
26 #include <common/log/log.h>
27 #include <common/diagnostics/graph.h>
28 #include <common/concurrency/future_util.h>
29
30 #include <core/video_format.h>
31
32 #include <boost/range/adaptor/transformed.hpp>
33 #include <boost/range/algorithm/min_element.hpp>
34 #include <boost/range/algorithm/max_element.hpp>
35 #include <boost/range/algorithm/for_each.hpp>
36 #include <boost/range/algorithm/count_if.hpp>
37 #include <boost/range/numeric.hpp>
38 #include <boost/algorithm/string/join.hpp>
39 #include <boost/thread/future.hpp>
40
41 #include <functional>
42 #include <vector>
43 #include <queue>
44 #include <utility>
45
46 #include <tbb/atomic.h>
47
48 namespace caspar { namespace core {
49
50 using namespace boost::adaptors;
51
52 class delegating_frame_consumer : public frame_consumer
53 {
54         safe_ptr<frame_consumer> consumer_;
55 public:
56         delegating_frame_consumer(const safe_ptr<frame_consumer>& consumer)
57                 : consumer_(consumer)
58         {
59         }
60
61         frame_consumer& get_delegate()
62         {
63                 return *consumer_;
64         }
65
66         const frame_consumer& get_delegate() const
67         {
68                 return *consumer_;
69         }
70         
71         virtual void initialize(
72                         const video_format_desc& format_desc, int channel_index) override
73         {
74                 get_delegate().initialize(format_desc, channel_index);
75         }
76
77         virtual int64_t presentation_frame_age_millis() const
78         {
79                 return get_delegate().presentation_frame_age_millis();
80         }
81
82         virtual boost::unique_future<bool> send(
83                         const safe_ptr<read_frame>& frame) override
84         {               
85                 return get_delegate().send(frame);
86         }
87
88         virtual std::wstring print() const override
89         {
90                 return get_delegate().print();
91         }
92
93         virtual boost::property_tree::wptree info() const override
94         {
95                 return get_delegate().info();
96         }
97
98         virtual bool has_synchronization_clock() const override
99         {
100                 return get_delegate().has_synchronization_clock();
101         }
102
103         virtual size_t buffer_depth() const override
104         {
105                 return get_delegate().buffer_depth();
106         }
107
108         virtual int index() const override
109         {
110                 return get_delegate().index();
111         }
112 };
113
114 const std::vector<int>& diag_colors()
115 {
116         static std::vector<int> colors = boost::assign::list_of<int>
117                         (diagnostics::color(0.0f, 0.6f, 0.9f))
118                         (diagnostics::color(0.6f, 0.3f, 0.3f))
119                         (diagnostics::color(0.3f, 0.6f, 0.3f))
120                         (diagnostics::color(0.4f, 0.3f, 0.8f))
121                         (diagnostics::color(0.9f, 0.9f, 0.5f))
122                         (diagnostics::color(0.2f, 0.9f, 0.9f));
123
124         return colors;
125 }
126
127 class buffering_consumer_adapter : public delegating_frame_consumer
128 {
129         std::queue<safe_ptr<read_frame>>        buffer_;
130         tbb::atomic<size_t>                                     buffered_;
131         tbb::atomic<int64_t>                            duplicate_next_;
132 public:
133         buffering_consumer_adapter(const safe_ptr<frame_consumer>& consumer)
134                 : delegating_frame_consumer(consumer)
135         {
136                 buffered_ = 0;
137                 duplicate_next_ = 0;
138         }
139
140         boost::unique_future<bool> consume_one()
141         {
142                 if (!buffer_.empty())
143                 {
144                         buffer_.pop();
145                         --buffered_;
146                 }
147
148                 return get_delegate().send(buffer_.front());
149         }
150
151         virtual boost::unique_future<bool> send(
152                         const safe_ptr<read_frame>& frame) override
153         {
154                 if (duplicate_next_)
155                 {
156                         --duplicate_next_;
157                 }
158                 else if (!buffer_.empty())
159                 {
160                         buffer_.pop();
161                         --buffered_;
162                 }
163
164                 buffer_.push(frame);
165                 ++buffered_;
166
167                 return get_delegate().send(buffer_.front());
168         }
169
170         void duplicate_next(int64_t to_duplicate)
171         {
172                 duplicate_next_ += to_duplicate;
173         }
174
175         size_t num_buffered() const
176         {
177                 return buffered_ - 1;
178         }
179
180         virtual std::wstring print() const override
181         {
182                 return L"buffering[" + get_delegate().print() + L"]";
183         }
184
185         virtual boost::property_tree::wptree info() const override
186         {
187                 boost::property_tree::wptree info;
188
189                 info.add(L"type", L"buffering-consumer-adapter");
190                 info.add_child(L"consumer", get_delegate().info());
191                 info.add(L"buffered-frames", num_buffered());
192
193                 return info;
194         }
195 };
196
197 static const uint64_t MAX_BUFFERED_OUT_OF_MEMORY_GUARD = 5;
198
199 struct synchronizing_consumer::implementation
200 {
201 private:
202         std::vector<safe_ptr<buffering_consumer_adapter>>       consumers_;
203         size_t                                                                                          buffer_depth_;
204         bool                                                                                            has_synchronization_clock_;
205         std::vector<boost::unique_future<bool>>                         results_;
206         boost::promise<bool>                                                            promise_;
207         video_format_desc                                                                       format_desc_;
208         safe_ptr<diagnostics::graph>                                            graph_;
209         int64_t                                                                                         grace_period_;
210         tbb::atomic<int64_t>                                                            current_diff_;
211 public:
212         implementation(const std::vector<safe_ptr<frame_consumer>>& consumers)
213                 : grace_period_(0)
214         {
215                 BOOST_FOREACH(auto& consumer, consumers)
216                         consumers_.push_back(make_safe<buffering_consumer_adapter>(consumer));
217
218                 current_diff_ = 0;
219                 auto buffer_depths = consumers | transformed(std::mem_fn(&frame_consumer::buffer_depth));
220                 std::vector<size_t> depths(buffer_depths.begin(), buffer_depths.end());
221                 buffer_depth_ = *boost::max_element(depths);
222                 has_synchronization_clock_ = boost::count_if(consumers, std::mem_fn(&frame_consumer::has_synchronization_clock)) > 0;
223
224                 diagnostics::register_graph(graph_);
225         }
226
227         boost::unique_future<bool> send(const safe_ptr<read_frame>& frame)
228         {
229                 results_.clear();
230
231                 BOOST_FOREACH(auto& consumer, consumers_)
232                         results_.push_back(consumer->send(frame));
233
234                 promise_ = boost::promise<bool>();
235                 promise_.set_wait_callback(std::function<void(boost::promise<bool>&)>([this](boost::promise<bool>& promise)
236                 {
237                         BOOST_FOREACH(auto& result, results_)
238                         {
239                                 result.get();
240                         }
241
242                         auto frame_ages = consumers_ | transformed(std::mem_fn(&frame_consumer::presentation_frame_age_millis));
243                         std::vector<int64_t> ages(frame_ages.begin(), frame_ages.end());
244                         auto max_age_iter = boost::max_element(ages);
245                         auto min_age_iter = boost::min_element(ages);
246                         int64_t min_age = *min_age_iter;
247                         
248                         if (min_age == 0)
249                         {
250                                 // One of the consumers have yet no measurement, wait until next 
251                                 // frame until we make any assumptions.
252                                 promise.set_value(true);
253                                 return;
254                         }
255
256                         int64_t max_age = *max_age_iter;
257                         int64_t age_diff = max_age - min_age;
258
259                         current_diff_ = age_diff;
260
261                         for (unsigned i = 0; i < ages.size(); ++i)
262                                 graph_->set_value(
263                                                 narrow(consumers_[i]->print()),
264                                                 static_cast<double>(ages[i]) / *max_age_iter);
265
266                         bool grace_period_over = grace_period_ == 1;
267
268                         if (grace_period_)
269                                 --grace_period_;
270
271                         if (grace_period_ == 0)
272                         {
273                                 int64_t frame_duration = static_cast<int64_t>(1000 / format_desc_.fps);
274
275                                 if (age_diff >= frame_duration)
276                                 {
277                                         CASPAR_LOG(info) << print() << L" Consumers not in sync. min: " << min_age << L" max: " << max_age;
278
279                                         auto index = min_age_iter - ages.begin();
280                                         auto to_duplicate = age_diff / frame_duration;
281                                         auto& consumer = *consumers_.at(index);
282
283                                         auto currently_buffered = consumer.num_buffered();
284
285                                         if (currently_buffered + to_duplicate > MAX_BUFFERED_OUT_OF_MEMORY_GUARD)
286                                         {
287                                                 CASPAR_LOG(info) << print() << L" Protecting from out of memory. Duplicating less frames than calculated";
288
289                                                 to_duplicate = MAX_BUFFERED_OUT_OF_MEMORY_GUARD - currently_buffered;
290                                         }
291
292                                         consumer.duplicate_next(to_duplicate);
293
294                                         grace_period_ = 10 + to_duplicate + buffer_depth_;
295                                 }
296                                 else if (grace_period_over)
297                                 {
298                                         CASPAR_LOG(info) << print() << L" Consumers resynced. min: " << min_age << L" max: " << max_age;
299                                 }
300                         }
301
302                         blocking_consume_unnecessarily_buffered();
303
304                         promise.set_value(true);
305                 }));
306
307                 return promise_.get_future();
308         }
309
310         void blocking_consume_unnecessarily_buffered()
311         {
312                 auto buffered = consumers_ | transformed(std::mem_fn(&buffering_consumer_adapter::num_buffered));
313                 std::vector<size_t> num_buffered(buffered.begin(), buffered.end());
314                 auto min_buffered = *boost::min_element(num_buffered);
315
316                 if (min_buffered)
317                         CASPAR_LOG(info) << print() << L" " << min_buffered 
318                                         << L" frames unnecessarily buffered. Consuming and letting channel pause during that time.";
319
320                 while (min_buffered)
321                 {
322                         std::vector<boost::unique_future<bool>> results;
323
324                         BOOST_FOREACH(auto& consumer, consumers_)
325                                 results.push_back(consumer->consume_one());
326
327                         BOOST_FOREACH(auto& result, results)
328                                 result.get();
329
330                         --min_buffered;
331                 }
332         }
333
334         void initialize(const video_format_desc& format_desc, int channel_index)
335         {
336                 for (size_t i = 0; i < consumers_.size(); ++i)
337                 {
338                         auto& consumer = consumers_.at(i);
339                         consumer->initialize(format_desc, channel_index); 
340                         graph_->set_color(
341                                         narrow(consumer->print()),
342                                         diag_colors().at(i % diag_colors().size()));
343                 }
344
345                 graph_->set_text(print());
346                 format_desc_ = format_desc;
347         }
348
349         int64_t presentation_frame_age_millis() const
350         {
351                 int64_t result = 0;
352
353                 BOOST_FOREACH(auto& consumer, consumers_)
354                         result = std::max(result, consumer->presentation_frame_age_millis());
355
356                 return result;
357         }
358
359         std::wstring print() const
360         {
361                 return L"synchronized[" + boost::algorithm::join(consumers_ | transformed(std::mem_fn(&frame_consumer::print)), L"|") +  L"]";
362         }
363
364         boost::property_tree::wptree info() const
365         {
366                 boost::property_tree::wptree info;
367
368                 info.add(L"type", L"synchronized-consumer");
369
370                 BOOST_FOREACH(auto& consumer, consumers_)
371                         info.add_child(L"consumer", consumer->info());
372
373                 info.add(L"age-diff", current_diff_);
374
375                 return info;
376         }
377
378         bool has_synchronization_clock() const
379         {
380                 return has_synchronization_clock_;
381         }
382
383         size_t buffer_depth() const
384         {
385                 return buffer_depth_;
386         }
387
388         int index() const
389         {
390                 return boost::accumulate(consumers_ | transformed(std::mem_fn(&frame_consumer::index)), 10000);
391         }
392 };
393
394 synchronizing_consumer::synchronizing_consumer(const std::vector<safe_ptr<frame_consumer>>& consumers)
395         : impl_(new implementation(consumers))
396 {
397 }
398
399 boost::unique_future<bool> synchronizing_consumer::send(const safe_ptr<read_frame>& frame)
400 {
401         return impl_->send(frame);
402 }
403
404 void synchronizing_consumer::initialize(const video_format_desc& format_desc, int channel_index)
405 {
406         impl_->initialize(format_desc, channel_index);
407 }
408
409 int64_t synchronizing_consumer::presentation_frame_age_millis() const
410 {
411         return impl_->presentation_frame_age_millis();
412 }
413
414 std::wstring synchronizing_consumer::print() const
415 {
416         return impl_->print();
417 }
418
419 boost::property_tree::wptree synchronizing_consumer::info() const
420 {
421         return impl_->info();
422 }
423
424 bool synchronizing_consumer::has_synchronization_clock() const
425 {
426         return impl_->has_synchronization_clock();
427 }
428
429 size_t synchronizing_consumer::buffer_depth() const
430 {
431         return impl_->buffer_depth();
432 }
433
434 int synchronizing_consumer::index() const
435 {
436         return impl_->index();
437 }
438
439 }}