X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=core%2Fconsumer%2Foutput.cpp;h=d3c0580eb01bc624b18b396db5a8d9ec42a39206;hb=9e4b08cde6c6de9e83a3fff42d90affc3cd8e5bc;hp=9bc1a59b61f38ca745a61ec35ef07cf45098ff17;hpb=435cf4b385c5099270bee44f89c3e2615af30521;p=casparcg diff --git a/core/consumer/output.cpp b/core/consumer/output.cpp index 9bc1a59b6..d3c0580eb 100644 --- a/core/consumer/output.cpp +++ b/core/consumer/output.cpp @@ -1,219 +1,325 @@ -/* -* Copyright (c) 2011 Sveriges Television AB -* -* This file is part of CasparCG (www.casparcg.com). -* -* CasparCG is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* (at your option) any later version. -* -* CasparCG is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with CasparCG. If not, see . -* -* Author: Robert Nagy, ronag89@gmail.com -*/ - -#include "../StdAfx.h" - -#ifdef _MSC_VER -#pragma warning (disable : 4244) -#endif - -#include "output.h" - -#include "frame_consumer.h" - -#include "../video_format.h" -#include "../frame/frame.h" - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace caspar { namespace core { - -struct output::impl -{ - spl::shared_ptr graph_; - const int channel_index_; - video_format_desc format_desc_; - std::map> consumers_; - prec_timer sync_timer_; - boost::circular_buffer frames_; - executor executor_; -public: - impl(spl::shared_ptr graph, const video_format_desc& format_desc, int channel_index) - : graph_(std::move(graph)) - , channel_index_(channel_index) - , format_desc_(format_desc) - , executor_(L"output") - { - graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8)); - } - - void add(int index, spl::shared_ptr consumer) - { - remove(index); - - consumer->initialize(format_desc_, channel_index_); - - executor_.begin_invoke([=] - { - consumers_.insert(std::make_pair(index, consumer)); - CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added."; - }, task_priority::high_priority); - } - - void add(const spl::shared_ptr& consumer) - { - add(consumer->index(), consumer); - } - - void remove(int index) - { - executor_.begin_invoke([=] - { - auto it = consumers_.find(index); - if(it != consumers_.end()) - consumers_.erase(it); - }, task_priority::high_priority); - } - - void remove(const spl::shared_ptr& consumer) - { - remove(consumer->index()); - } - - void video_format_desc(const core::video_format_desc& format_desc) - { - executor_.invoke([&] - { - if(format_desc_ == format_desc) - return; - - auto it = consumers_.begin(); - while(it != consumers_.end()) - { - try - { - it->second->initialize(format_desc, channel_index_); - ++it; - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed."; - consumers_.erase(it++); - } - } - - format_desc_ = format_desc; - frames_.clear(); - }); - } - - std::pair minmax_buffer_depth() const - { - if(consumers_.empty()) - return std::make_pair(0, 0); - - auto buffer_depths = consumers_ | - boost::adaptors::map_values | // std::function is MSVC workaround - boost::adaptors::transformed(std::function&)>([](const spl::shared_ptr& x){return x->buffer_depth();})); - - - return std::make_pair(*boost::range::min_element(buffer_depths), *boost::range::max_element(buffer_depths)); - } - - bool has_synchronization_clock() const - { - return boost::range::count_if(consumers_ | boost::adaptors::map_values, [](const spl::shared_ptr& x){return x->has_synchronization_clock();}) > 0; - } - - void operator()(const_frame input_frame, const core::video_format_desc& format_desc) - { - video_format_desc(format_desc); - - executor_.invoke([=] - { - boost::timer frame_timer; - - if(!has_synchronization_clock()) - sync_timer_.tick(1.0/format_desc_.fps); - - if(input_frame.size() != format_desc_.size) - { - sync_timer_.tick(1.0/format_desc_.fps); - return; - } - - auto minmax = minmax_buffer_depth(); - - frames_.set_capacity(std::max(2, minmax.second - minmax.first) + 1); // std::max(1, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back. - frames_.push_back(input_frame); - - if(!frames_.full()) - return; - - for(auto it = consumers_.begin(); it != consumers_.end();) - { - auto consumer = it->second; - auto frame = frames_.at(consumer->buffer_depth()-minmax.first); - - if(consumer->send(frame)) - ++it; - else - { - CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed."; - consumers_.erase(it++); - } - } - - graph_->set_value("consume-time", frame_timer.elapsed()*format_desc.fps*0.5); - }); - } - - std::wstring print() const - { - return L"output[" + boost::lexical_cast(channel_index_) + L"]"; - } - - boost::unique_future info() - { - return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree - { - boost::property_tree::wptree info; - BOOST_FOREACH(auto& consumer, consumers_) - { - info.add_child(L"consumers.consumer", consumer.second->info()) - .add(L"index", consumer.first); - } - return info; - }, task_priority::high_priority)); - } -}; - -output::output(spl::shared_ptr graph, const video_format_desc& format_desc, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_index)){} -void output::add(int index, const spl::shared_ptr& consumer){impl_->add(index, consumer);} -void output::add(const spl::shared_ptr& consumer){impl_->add(consumer);} -void output::remove(int index){impl_->remove(index);} -void output::remove(const spl::shared_ptr& consumer){impl_->remove(consumer);} -boost::unique_future output::info() const{return impl_->info();} -void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);} -}} \ No newline at end of file +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Robert Nagy, ronag89@gmail.com +*/ + +#include "../StdAfx.h" + +#ifdef _MSC_VER +#pragma warning (disable : 4244) +#endif + +#include "output.h" + +#include "frame_consumer.h" +#include "port.h" + +#include "../video_format.h" +#include "../frame/frame.h" +#include "../frame/audio_channel_layout.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +namespace caspar { namespace core { + +struct output::impl +{ + spl::shared_ptr graph_; + spl::shared_ptr monitor_subject_ = spl::make_shared("/output"); + const int channel_index_; + video_format_desc format_desc_; + audio_channel_layout channel_layout_; + std::map ports_; + prec_timer sync_timer_; + boost::circular_buffer frames_; + std::map send_to_consumers_delays_; + executor executor_ { L"output " + boost::lexical_cast(channel_index_) }; +public: + impl(spl::shared_ptr graph, const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) + : graph_(std::move(graph)) + , channel_index_(channel_index) + , format_desc_(format_desc) + , channel_layout_(channel_layout) + { + graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8f)); + } + + void add(int index, spl::shared_ptr consumer) + { + remove(index); + + consumer->initialize(format_desc_, channel_layout_, channel_index_); + + executor_.begin_invoke([this, index, consumer] + { + port p(index, channel_index_, std::move(consumer)); + p.monitor_output().attach_parent(monitor_subject_); + ports_.insert(std::make_pair(index, std::move(p))); + }, task_priority::high_priority); + } + + void add(const spl::shared_ptr& consumer) + { + add(consumer->index(), consumer); + } + + void remove(int index) + { + executor_.begin_invoke([=] + { + auto it = ports_.find(index); + if (it != ports_.end()) + { + ports_.erase(it); + send_to_consumers_delays_.erase(index); + } + }, task_priority::high_priority); + } + + void remove(const spl::shared_ptr& consumer) + { + remove(consumer->index()); + } + + void change_channel_format(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout) + { + executor_.invoke([&] + { + if(format_desc_ == format_desc && channel_layout_ == channel_layout) + return; + + auto it = ports_.begin(); + while(it != ports_.end()) + { + try + { + it->second.change_channel_format(format_desc, channel_layout); + ++it; + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + send_to_consumers_delays_.erase(it->first); + ports_.erase(it++); + } + } + + format_desc_ = format_desc; + channel_layout_ = channel_layout; + frames_.clear(); + }); + } + + std::pair minmax_buffer_depth() const + { + if(ports_.empty()) + return std::make_pair(0, 0); + + return cpplinq::from(ports_) + .select(values()) + .select(std::mem_fn(&port::buffer_depth)) + .where([](int v) { return v >= 0; }) + .aggregate(minmax::initial_value(), minmax()); + } + + bool has_synchronization_clock() const + { + return cpplinq::from(ports_) + .select(values()) + .where(std::mem_fn(&port::has_synchronization_clock)) + .any(); + } + + std::future operator()(const_frame input_frame, const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout) + { + spl::shared_ptr frame_timer; + + change_channel_format(format_desc, channel_layout); + + auto pending_send_results = executor_.invoke([=]() -> std::shared_ptr>> + { + if (input_frame.size() != format_desc_.size) + { + CASPAR_LOG(warning) << print() << L" Invalid input frame dimension."; + return nullptr; + } + + auto minmax = minmax_buffer_depth(); + + frames_.set_capacity(std::max(2, minmax.second - minmax.first) + 1); // std::max(2, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back. + frames_.push_back(input_frame); + + if (!frames_.full()) + return nullptr; + + spl::shared_ptr>> send_results; + + // Start invocations + for (auto it = ports_.begin(); it != ports_.end();) + { + auto& port = it->second; + auto depth = port.buffer_depth(); + auto& frame = depth < 0 ? frames_.back() : frames_.at(depth - minmax.first); + + send_to_consumers_delays_[it->first] = frame.get_age_millis(); + + try + { + send_results->insert(std::make_pair(it->first, port.send(frame))); + ++it; + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + try + { + send_results->insert(std::make_pair(it->first, port.send(frame))); + ++it; + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + CASPAR_LOG(error) << "Failed to recover consumer: " << port.print() << L". Removing it."; + send_to_consumers_delays_.erase(it->first); + it = ports_.erase(it); + } + } + } + + return send_results; + }); + + if (!pending_send_results) + return make_ready_future(); + + return executor_.begin_invoke([=]() + { + // Retrieve results + for (auto it = pending_send_results->begin(); it != pending_send_results->end(); ++it) + { + try + { + if (!it->second.get()) + { + send_to_consumers_delays_.erase(it->first); + ports_.erase(it->first); + } + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + send_to_consumers_delays_.erase(it->first); + ports_.erase(it->first); + } + } + + if (!has_synchronization_clock()) + sync_timer_.tick(1.0 / format_desc_.fps); + + auto consume_time = frame_timer->elapsed(); + graph_->set_value("consume-time", consume_time * format_desc.fps * 0.5); + *monitor_subject_ + << monitor::message("/consume_time") % consume_time + << monitor::message("/profiler/time") % consume_time % (1.0 / format_desc.fps); + }); + } + + std::wstring print() const + { + return L"output[" + boost::lexical_cast(channel_index_) + L"]"; + } + + std::future info() + { + return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree + { + boost::property_tree::wptree info; + for (auto& port : ports_) + { + info.add_child(L"consumers.consumer", port.second.info()) + .add(L"index", port.first); + } + return info; + }, task_priority::high_priority)); + } + + std::future delay_info() + { + return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree + { + boost::property_tree::wptree info; + + for (auto& port : ports_) + { + auto total_age = + port.second.presentation_frame_age_millis(); + auto sendoff_age = send_to_consumers_delays_[port.first]; + auto presentation_time = total_age - sendoff_age; + + boost::property_tree::wptree child; + child.add(L"name", port.second.print()); + child.add(L"age-at-arrival", sendoff_age); + child.add(L"presentation-time", presentation_time); + child.add(L"age-at-presentation", total_age); + + info.add_child(L"consumer", child); + } + return info; + }, task_priority::high_priority)); + } + + std::vector> get_consumers() + { + return executor_.invoke([=] + { + std::vector> consumers; + + for (auto& port : ports_) + consumers.push_back(port.second.consumer()); + + return consumers; + }); + } +}; + +output::output(spl::shared_ptr graph, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_layout, channel_index)){} +void output::add(int index, const spl::shared_ptr& consumer){impl_->add(index, consumer);} +void output::add(const spl::shared_ptr& consumer){impl_->add(consumer);} +void output::remove(int index){impl_->remove(index);} +void output::remove(const spl::shared_ptr& consumer){impl_->remove(consumer);} +std::future output::info() const{return impl_->info();} +std::future output::delay_info() const{ return impl_->delay_info(); } +std::vector> output::get_consumers() const { return impl_->get_consumers(); } +std::future output::operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout){ return (*impl_)(std::move(frame), format_desc, channel_layout); } +monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;} +}}