X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=core%2Fconsumer%2Foutput.cpp;h=d3c0580eb01bc624b18b396db5a8d9ec42a39206;hb=9e4b08cde6c6de9e83a3fff42d90affc3cd8e5bc;hp=048b6274c2d86e6390362fa1a389bc27661b6b45;hpb=0a0753380eefc7aa64561308ad02564735262ae9;p=casparcg diff --git a/core/consumer/output.cpp b/core/consumer/output.cpp index 048b6274c..d3c0580eb 100644 --- a/core/consumer/output.cpp +++ b/core/consumer/output.cpp @@ -32,6 +32,7 @@ #include "../video_format.h" #include "../frame/frame.h" +#include "../frame/audio_channel_layout.h" #include #include @@ -41,43 +42,46 @@ #include #include #include +#include #include #include #include -#include #include namespace caspar { namespace core { struct output::impl -{ +{ spl::shared_ptr graph_; - spl::shared_ptr monitor_subject_ = spl::make_shared("/output"); + spl::shared_ptr monitor_subject_ = spl::make_shared("/output"); const int channel_index_; video_format_desc format_desc_; - std::map ports_; + audio_channel_layout channel_layout_; + std::map ports_; prec_timer sync_timer_; boost::circular_buffer frames_; - executor executor_ = { L"output" }; + std::map send_to_consumers_delays_; + executor executor_ { L"output " + boost::lexical_cast(channel_index_) }; public: - impl(spl::shared_ptr graph, const video_format_desc& format_desc, int channel_index) + impl(spl::shared_ptr graph, const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index) : graph_(std::move(graph)) , channel_index_(channel_index) , format_desc_(format_desc) + , channel_layout_(channel_layout) { graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8f)); - } - + } + void add(int index, spl::shared_ptr consumer) - { + { remove(index); - consumer->initialize(format_desc_, channel_index_); - + consumer->initialize(format_desc_, channel_layout_, channel_index_); + executor_.begin_invoke([this, index, consumer] - { + { port p(index, channel_index_, std::move(consumer)); p.monitor_output().attach_parent(monitor_subject_); ports_.insert(std::make_pair(index, std::move(p))); @@ -90,12 +94,15 @@ public: } void remove(int index) - { + { executor_.begin_invoke([=] { auto it = ports_.find(index); - if(it != ports_.end()) - ports_.erase(it); + if (it != ports_.end()) + { + ports_.erase(it); + send_to_consumers_delays_.erase(index); + } }, task_priority::high_priority); } @@ -103,42 +110,45 @@ public: { remove(consumer->index()); } - - void set_video_format_desc(const core::video_format_desc& format_desc) + + void change_channel_format(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout) { executor_.invoke([&] { - if(format_desc_ == format_desc) + if(format_desc_ == format_desc && channel_layout_ == channel_layout) return; auto it = ports_.begin(); while(it != ports_.end()) - { + { try { - it->second.video_format_desc(format_desc); + it->second.change_channel_format(format_desc, channel_layout); ++it; } catch(...) { CASPAR_LOG_CURRENT_EXCEPTION(); + send_to_consumers_delays_.erase(it->first); ports_.erase(it++); } } - + format_desc_ = format_desc; + channel_layout_ = channel_layout; frames_.clear(); }); } std::pair minmax_buffer_depth() const - { + { if(ports_.empty()) return std::make_pair(0, 0); return cpplinq::from(ports_) .select(values()) .select(std::mem_fn(&port::buffer_depth)) + .where([](int v) { return v >= 0; }) .aggregate(minmax::initial_value(), minmax()); } @@ -149,23 +159,19 @@ public: .where(std::mem_fn(&port::has_synchronization_clock)) .any(); } - - void operator()(const_frame input_frame, const core::video_format_desc& format_desc) - { - boost::timer frame_timer; - set_video_format_desc(format_desc); + std::future operator()(const_frame input_frame, const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout) + { + spl::shared_ptr frame_timer; - executor_.invoke([=] - { + change_channel_format(format_desc, channel_layout); - if(!has_synchronization_clock()) - sync_timer_.tick(1.0/format_desc_.fps); - - if(input_frame.size() != format_desc_.size) + auto pending_send_results = executor_.invoke([=]() -> std::shared_ptr>> + { + if (input_frame.size() != format_desc_.size) { - CASPAR_LOG(debug) << print() << L" Invalid input frame dimension."; - return; + CASPAR_LOG(warning) << print() << L" Invalid input frame dimension."; + return nullptr; } auto minmax = minmax_buffer_depth(); @@ -173,20 +179,23 @@ public: frames_.set_capacity(std::max(2, minmax.second - minmax.first) + 1); // std::max(2, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back. frames_.push_back(input_frame); - if(!frames_.full()) - return; + if (!frames_.full()) + return nullptr; - std::map> send_results; + spl::shared_ptr>> send_results; // Start invocations for (auto it = ports_.begin(); it != ports_.end();) { - auto& port = it->second; - auto& frame = frames_.at(port.buffer_depth()-minmax.first); - + auto& port = it->second; + auto depth = port.buffer_depth(); + auto& frame = depth < 0 ? frames_.back() : frames_.at(depth - minmax.first); + + send_to_consumers_delays_[it->first] = frame.get_age_millis(); + try { - send_results.insert(std::make_pair(it->first, port.send(frame))); + send_results->insert(std::make_pair(it->first, port.send(frame))); ++it; } catch (...) @@ -194,35 +203,55 @@ public: CASPAR_LOG_CURRENT_EXCEPTION(); try { - send_results.insert(std::make_pair(it->first, port.send(frame))); + send_results->insert(std::make_pair(it->first, port.send(frame))); ++it; } - catch(...) + catch (...) { CASPAR_LOG_CURRENT_EXCEPTION(); CASPAR_LOG(error) << "Failed to recover consumer: " << port.print() << L". Removing it."; + send_to_consumers_delays_.erase(it->first); it = ports_.erase(it); } } } + return send_results; + }); + + if (!pending_send_results) + return make_ready_future(); + + return executor_.begin_invoke([=]() + { // Retrieve results - for (auto it = send_results.begin(); it != send_results.end(); ++it) + for (auto it = pending_send_results->begin(); it != pending_send_results->end(); ++it) { try { if (!it->second.get()) + { + send_to_consumers_delays_.erase(it->first); ports_.erase(it->first); + } } catch (...) { CASPAR_LOG_CURRENT_EXCEPTION(); + send_to_consumers_delays_.erase(it->first); ports_.erase(it->first); } } - }); - graph_->set_value("consume-time", frame_timer.elapsed()*format_desc.fps*0.5); + if (!has_synchronization_clock()) + sync_timer_.tick(1.0 / format_desc_.fps); + + auto consume_time = frame_timer->elapsed(); + graph_->set_value("consume-time", consume_time * format_desc.fps * 0.5); + *monitor_subject_ + << monitor::message("/consume_time") % consume_time + << monitor::message("/profiler/time") % consume_time % (1.0 / format_desc.fps); + }); } std::wstring print() const @@ -233,24 +262,64 @@ public: std::future info() { return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree - { + { boost::property_tree::wptree info; for (auto& port : ports_) { info.add_child(L"consumers.consumer", port.second.info()) - .add(L"index", port.first); + .add(L"index", port.first); } return info; }, task_priority::high_priority)); } + + std::future delay_info() + { + return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree + { + boost::property_tree::wptree info; + + for (auto& port : ports_) + { + auto total_age = + port.second.presentation_frame_age_millis(); + auto sendoff_age = send_to_consumers_delays_[port.first]; + auto presentation_time = total_age - sendoff_age; + + boost::property_tree::wptree child; + child.add(L"name", port.second.print()); + child.add(L"age-at-arrival", sendoff_age); + child.add(L"presentation-time", presentation_time); + child.add(L"age-at-presentation", total_age); + + info.add_child(L"consumer", child); + } + return info; + }, task_priority::high_priority)); + } + + std::vector> get_consumers() + { + return executor_.invoke([=] + { + std::vector> consumers; + + for (auto& port : ports_) + consumers.push_back(port.second.consumer()); + + return consumers; + }); + } }; -output::output(spl::shared_ptr graph, const video_format_desc& format_desc, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_index)){} +output::output(spl::shared_ptr graph, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_layout, channel_index)){} void output::add(int index, const spl::shared_ptr& consumer){impl_->add(index, consumer);} void output::add(const spl::shared_ptr& consumer){impl_->add(consumer);} void output::remove(int index){impl_->remove(index);} void output::remove(const spl::shared_ptr& consumer){impl_->remove(consumer);} std::future output::info() const{return impl_->info();} -void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);} +std::future output::delay_info() const{ return impl_->delay_info(); } +std::vector> output::get_consumers() const { return impl_->get_consumers(); } +std::future output::operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout){ return (*impl_)(std::move(frame), format_desc, channel_layout); } monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;} }}