2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
\r
4 * This file is part of CasparCG (www.casparcg.com).
\r
6 * CasparCG is free software: you can redistribute it and/or modify
\r
7 * it under the terms of the GNU General Public License as published by
\r
8 * the Free Software Foundation, either version 3 of the License, or
\r
9 * (at your option) any later version.
\r
11 * CasparCG is distributed in the hope that it will be useful,
\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
\r
14 * GNU General Public License for more details.
\r
16 * You should have received a copy of the GNU General Public License
\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
\r
19 * Author: Robert Nagy, ronag89@gmail.com
\r
22 #include "../StdAfx.h"
\r
25 #pragma warning (disable : 4244)
\r
30 #include "../video_format.h"
\r
31 #include "../mixer/gpu/ogl_device.h"
\r
32 #include "../mixer/read_frame.h"
\r
34 #include <common/concurrency/executor.h>
\r
35 #include <common/utility/assert.h>
\r
36 #include <common/utility/timer.h>
\r
37 #include <common/memory/memshfl.h>
\r
38 #include <common/env.h>
\r
40 #include <boost/circular_buffer.hpp>
\r
41 #include <boost/timer.hpp>
\r
42 #include <boost/range/algorithm.hpp>
\r
43 #include <boost/range/adaptors.hpp>
\r
44 #include <boost/property_tree/ptree.hpp>
\r
46 namespace caspar { namespace core {
\r
48 struct output::implementation
\r
50 const int channel_index_;
\r
51 const safe_ptr<diagnostics::graph> graph_;
\r
52 monitor::subject monitor_subject_;
\r
53 boost::timer consume_timer_;
\r
55 video_format_desc format_desc_;
\r
56 channel_layout audio_channel_layout_;
\r
58 std::map<int, safe_ptr<frame_consumer>> consumers_;
\r
60 high_prec_timer sync_timer_;
\r
62 boost::circular_buffer<safe_ptr<read_frame>> frames_;
\r
63 std::map<int, int64_t> send_to_consumers_delays_;
\r
69 const safe_ptr<diagnostics::graph>& graph,
\r
70 const video_format_desc& format_desc,
\r
71 const channel_layout& audio_channel_layout,
\r
73 : channel_index_(channel_index)
\r
75 , monitor_subject_("/output")
\r
76 , format_desc_(format_desc)
\r
77 , audio_channel_layout_(audio_channel_layout)
\r
78 , executor_(L"output")
\r
80 graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8));
\r
83 void add(int index, safe_ptr<frame_consumer> consumer)
\r
87 consumer = create_consumer_cadence_guard(consumer);
\r
88 consumer->initialize(format_desc_, audio_channel_layout_, channel_index_);
\r
90 executor_.invoke([&]
\r
92 consumers_.insert(std::make_pair(index, consumer));
\r
93 CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added.";
\r
97 void add(const safe_ptr<frame_consumer>& consumer)
\r
99 add(consumer->index(), consumer);
\r
102 void remove(int index)
\r
104 // Destroy consumer on calling thread:
\r
105 std::shared_ptr<frame_consumer> old_consumer;
\r
107 executor_.invoke([&]
\r
109 auto it = consumers_.find(index);
\r
110 if(it != consumers_.end())
\r
112 old_consumer = it->second;
\r
113 send_to_consumers_delays_.erase(it->first);
\r
114 consumers_.erase(it);
\r
120 auto str = old_consumer->print();
\r
121 old_consumer.reset();
\r
122 CASPAR_LOG(info) << print() << L" " << str << L" Removed.";
\r
126 void remove(const safe_ptr<frame_consumer>& consumer)
\r
128 remove(consumer->index());
\r
131 void set_video_format_desc(const video_format_desc& format_desc)
\r
133 executor_.invoke([&]
\r
135 auto it = consumers_.begin();
\r
136 while(it != consumers_.end())
\r
140 it->second->initialize(format_desc, audio_channel_layout_, channel_index_);
\r
145 CASPAR_LOG_CURRENT_EXCEPTION();
\r
146 CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";
\r
147 send_to_consumers_delays_.erase(it->first);
\r
148 consumers_.erase(it++);
\r
152 format_desc_ = format_desc;
\r
157 std::map<int, size_t> buffer_depths_snapshot() const
\r
159 std::map<int, size_t> result;
\r
161 BOOST_FOREACH(auto& consumer, consumers_)
\r
162 result.insert(std::make_pair(
\r
164 consumer.second->buffer_depth()));
\r
166 return std::move(result);
\r
169 std::pair<size_t, size_t> minmax_buffer_depth(
\r
170 const std::map<int, size_t>& buffer_depths) const
\r
172 if(consumers_.empty())
\r
173 return std::make_pair(0, 0);
\r
175 auto depths = buffer_depths | boost::adaptors::map_values;
\r
177 return std::make_pair(
\r
178 *boost::range::min_element(depths),
\r
179 *boost::range::max_element(depths));
\r
182 bool has_synchronization_clock() const
\r
184 return boost::range::count_if(consumers_ | boost::adaptors::map_values, [](const safe_ptr<frame_consumer>& x){return x->has_synchronization_clock();}) > 0;
\r
187 void send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& packet)
\r
189 executor_.begin_invoke([=]
\r
193 consume_timer_.restart();
\r
195 auto input_frame = packet.first;
\r
197 if(!has_synchronization_clock())
\r
198 sync_timer_.tick(1.0/format_desc_.fps);
\r
200 if(input_frame->image_size() != format_desc_.size)
\r
202 sync_timer_.tick(1.0/format_desc_.fps);
\r
206 auto buffer_depths = buffer_depths_snapshot();
\r
207 auto minmax = minmax_buffer_depth(buffer_depths);
\r
209 frames_.set_capacity(minmax.second - minmax.first + 1);
\r
210 frames_.push_back(input_frame);
\r
212 if(!frames_.full())
\r
215 std::map<int, boost::unique_future<bool>> send_results;
\r
217 // Start invocations
\r
218 for (auto it = consumers_.begin(); it != consumers_.end();)
\r
220 auto consumer = it->second;
\r
221 auto frame = frames_.at(buffer_depths[it->first]-minmax.first);
\r
223 send_to_consumers_delays_[it->first] = frame->get_age_millis();
\r
227 send_results.insert(std::make_pair(it->first, consumer->send(frame)));
\r
232 CASPAR_LOG_CURRENT_EXCEPTION();
\r
235 send_results.insert(std::make_pair(it->first, consumer->send(frame)));
\r
240 CASPAR_LOG_CURRENT_EXCEPTION();
\r
241 CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";
\r
242 send_to_consumers_delays_.erase(it->first);
\r
243 it = consumers_.erase(it);
\r
248 // Retrieve results
\r
249 for (auto result_it = send_results.begin(); result_it != send_results.end(); ++result_it)
\r
251 auto consumer = consumers_.at(result_it->first);
\r
252 auto frame = frames_.at(buffer_depths[result_it->first]-minmax.first);
\r
253 auto& result_future = result_it->second;
\r
257 if (!result_future.timed_wait(boost::posix_time::seconds(2)))
\r
259 BOOST_THROW_EXCEPTION(timed_out() << msg_info(narrow(print()) + " " + narrow(consumer->print()) + " Timed out during send"));
\r
262 if (!result_future.get())
\r
264 CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";
\r
265 send_to_consumers_delays_.erase(result_it->first);
\r
266 consumers_.erase(result_it->first);
\r
271 CASPAR_LOG_CURRENT_EXCEPTION();
\r
274 consumer->initialize(format_desc_, audio_channel_layout_, channel_index_);
\r
275 auto retry_future = consumer->send(frame);
\r
277 if (!retry_future.timed_wait(boost::posix_time::seconds(2)))
\r
279 BOOST_THROW_EXCEPTION(timed_out() << msg_info(narrow(print()) + " " + narrow(consumer->print()) + " Timed out during retry"));
\r
282 if (!retry_future.get())
\r
284 CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";
\r
285 send_to_consumers_delays_.erase(result_it->first);
\r
286 consumers_.erase(result_it->first);
\r
291 CASPAR_LOG_CURRENT_EXCEPTION();
\r
292 CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";
\r
293 send_to_consumers_delays_.erase(result_it->first);
\r
294 consumers_.erase(result_it->first);
\r
299 graph_->set_value("consume-time", consume_timer_.elapsed()*format_desc_.fps*0.5);
\r
300 monitor_subject_ << monitor::message("/consume_time") % (consume_timer_.elapsed());
\r
304 CASPAR_LOG_CURRENT_EXCEPTION();
\r
309 std::wstring print() const
\r
311 return L"output[" + boost::lexical_cast<std::wstring>(channel_index_) + L"]";
\r
314 boost::unique_future<boost::property_tree::wptree> info()
\r
316 return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
\r
318 boost::property_tree::wptree info;
\r
319 BOOST_FOREACH(auto& consumer, consumers_)
\r
321 info.add_child(L"consumers.consumer", consumer.second->info())
\r
322 .add(L"index", consumer.first);
\r
325 }, high_priority));
\r
328 boost::unique_future<boost::property_tree::wptree> delay_info()
\r
330 return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
\r
332 boost::property_tree::wptree info;
\r
333 BOOST_FOREACH(auto& consumer, consumers_)
\r
336 consumer.second->presentation_frame_age_millis();
\r
337 auto sendoff_age = send_to_consumers_delays_[consumer.first];
\r
338 auto presentation_time = total_age - sendoff_age;
\r
340 boost::property_tree::wptree child;
\r
341 child.add(L"name", consumer.second->print());
\r
342 child.add(L"age-at-arrival", sendoff_age);
\r
343 child.add(L"presentation-time", presentation_time);
\r
344 child.add(L"age-at-presentation", total_age);
\r
346 info.add_child(L"consumer", child);
\r
349 }, high_priority));
\r
354 return executor_.invoke([this]
\r
356 return consumers_.empty();
\r
360 monitor::subject& monitor_output()
\r
362 return monitor_subject_;
\r
366 output::output(const safe_ptr<diagnostics::graph>& graph, const video_format_desc& format_desc, const channel_layout& audio_channel_layout, int channel_index) : impl_(new implementation(graph, format_desc, audio_channel_layout, channel_index)){}
\r
367 void output::add(int index, const safe_ptr<frame_consumer>& consumer){impl_->add(index, consumer);}
\r
368 void output::add(const safe_ptr<frame_consumer>& consumer){impl_->add(consumer);}
\r
369 void output::remove(int index){impl_->remove(index);}
\r
370 void output::remove(const safe_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
\r
371 void output::send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& frame) {impl_->send(frame); }
\r
372 void output::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}
\r
373 boost::unique_future<boost::property_tree::wptree> output::info() const{return impl_->info();}
\r
374 boost::unique_future<boost::property_tree::wptree> output::delay_info() const{return impl_->delay_info();}
\r
375 bool output::empty() const{return impl_->empty();}
\r
376 monitor::subject& output::monitor_output() { return impl_->monitor_output(); }
\r