#include "../video_format.h"
#include "../frame/frame.h"
+#include "../frame/audio_channel_layout.h"
#include <common/assert.h>
#include <common/future.h>
#include <common/memshfl.h>
#include <common/env.h>
#include <common/linq.h>
+#include <common/timer.h>
#include <boost/circular_buffer.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/property_tree/ptree.hpp>
-#include <boost/timer.hpp>
#include <functional>
namespace caspar { namespace core {
struct output::impl
-{
+{
spl::shared_ptr<diagnostics::graph> graph_;
- spl::shared_ptr<monitor::subject> monitor_subject_ = spl::make_shared<monitor::subject>("/output");
+ spl::shared_ptr<monitor::subject> monitor_subject_ = spl::make_shared<monitor::subject>("/output");
const int channel_index_;
video_format_desc format_desc_;
- std::map<int, port> ports_;
+ audio_channel_layout channel_layout_;
+ std::map<int, port> ports_;
prec_timer sync_timer_;
boost::circular_buffer<const_frame> frames_;
- executor executor_ = { L"output" };
+ std::map<int, int64_t> send_to_consumers_delays_;
+ executor executor_ { L"output " + boost::lexical_cast<std::wstring>(channel_index_) };
public:
- impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index)
+ impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, const audio_channel_layout& channel_layout, int channel_index)
: graph_(std::move(graph))
, channel_index_(channel_index)
, format_desc_(format_desc)
+ , channel_layout_(channel_layout)
{
graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8f));
- }
-
+ }
+
void add(int index, spl::shared_ptr<frame_consumer> consumer)
- {
+ {
remove(index);
- consumer->initialize(format_desc_, channel_index_);
-
+ consumer->initialize(format_desc_, channel_layout_, channel_index_);
+
executor_.begin_invoke([this, index, consumer]
- {
+ {
port p(index, channel_index_, std::move(consumer));
p.monitor_output().attach_parent(monitor_subject_);
ports_.insert(std::make_pair(index, std::move(p)));
}
void remove(int index)
- {
+ {
executor_.begin_invoke([=]
{
auto it = ports_.find(index);
- if(it != ports_.end())
- ports_.erase(it);
+ if (it != ports_.end())
+ {
+ ports_.erase(it);
+ send_to_consumers_delays_.erase(index);
+ }
}, task_priority::high_priority);
}
{
remove(consumer->index());
}
-
- void video_format_desc(const core::video_format_desc& format_desc)
+
+ void change_channel_format(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout)
{
executor_.invoke([&]
{
- if(format_desc_ == format_desc)
+ if(format_desc_ == format_desc && channel_layout_ == channel_layout)
return;
auto it = ports_.begin();
while(it != ports_.end())
- {
+ {
try
{
- it->second.video_format_desc(format_desc);
+ it->second.change_channel_format(format_desc, channel_layout);
++it;
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
+ send_to_consumers_delays_.erase(it->first);
ports_.erase(it++);
}
}
-
+
format_desc_ = format_desc;
+ channel_layout_ = channel_layout;
frames_.clear();
});
}
std::pair<int, int> minmax_buffer_depth() const
- {
+ {
if(ports_.empty())
return std::make_pair(0, 0);
return cpplinq::from(ports_)
.select(values())
.select(std::mem_fn(&port::buffer_depth))
+ .where([](int v) { return v >= 0; })
.aggregate(minmax::initial_value<int>(), minmax());
}
.where(std::mem_fn(&port::has_synchronization_clock))
.any();
}
-
- void operator()(const_frame input_frame, const core::video_format_desc& format_desc)
- {
- boost::timer frame_timer;
- video_format_desc(format_desc);
+ std::future<void> operator()(const_frame input_frame, const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout)
+ {
+ spl::shared_ptr<caspar::timer> frame_timer;
- executor_.invoke([=]
- {
+ change_channel_format(format_desc, channel_layout);
- if(!has_synchronization_clock())
- sync_timer_.tick(1.0/format_desc_.fps);
-
- if(input_frame.size() != format_desc_.size)
+ auto pending_send_results = executor_.invoke([=]() -> std::shared_ptr<std::map<int, std::future<bool>>>
+ {
+ if (input_frame.size() != format_desc_.size)
{
- CASPAR_LOG(debug) << print() << L" Invalid input frame dimension.";
- return;
+ CASPAR_LOG(warning) << print() << L" Invalid input frame dimension.";
+ return nullptr;
}
auto minmax = minmax_buffer_depth();
frames_.set_capacity(std::max(2, minmax.second - minmax.first) + 1); // std::max(2, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back.
frames_.push_back(input_frame);
- if(!frames_.full())
- return;
+ if (!frames_.full())
+ return nullptr;
- std::map<int, std::future<bool>> send_results;
+ spl::shared_ptr<std::map<int, std::future<bool>>> send_results;
// Start invocations
for (auto it = ports_.begin(); it != ports_.end();)
{
- auto& port = it->second;
- auto& frame = frames_.at(port.buffer_depth()-minmax.first);
-
+ auto& port = it->second;
+ auto depth = port.buffer_depth();
+ auto& frame = depth < 0 ? frames_.back() : frames_.at(depth - minmax.first);
+
+ send_to_consumers_delays_[it->first] = frame.get_age_millis();
+
try
{
- send_results.insert(std::make_pair(it->first, port.send(frame)));
+ send_results->insert(std::make_pair(it->first, port.send(frame)));
++it;
}
catch (...)
CASPAR_LOG_CURRENT_EXCEPTION();
try
{
- send_results.insert(std::make_pair(it->first, port.send(frame)));
+ send_results->insert(std::make_pair(it->first, port.send(frame)));
++it;
}
- catch(...)
+ catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
CASPAR_LOG(error) << "Failed to recover consumer: " << port.print() << L". Removing it.";
+ send_to_consumers_delays_.erase(it->first);
it = ports_.erase(it);
}
}
}
+ return send_results;
+ });
+
+ if (!pending_send_results)
+ return make_ready_future();
+
+ return executor_.begin_invoke([=]()
+ {
// Retrieve results
- for (auto it = send_results.begin(); it != send_results.end(); ++it)
+ for (auto it = pending_send_results->begin(); it != pending_send_results->end(); ++it)
{
try
{
if (!it->second.get())
+ {
+ send_to_consumers_delays_.erase(it->first);
ports_.erase(it->first);
+ }
}
catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
+ send_to_consumers_delays_.erase(it->first);
ports_.erase(it->first);
}
}
- });
- graph_->set_value("consume-time", frame_timer.elapsed()*format_desc.fps*0.5);
+ if (!has_synchronization_clock())
+ sync_timer_.tick(1.0 / format_desc_.fps);
+
+ auto consume_time = frame_timer->elapsed();
+ graph_->set_value("consume-time", consume_time * format_desc.fps * 0.5);
+ *monitor_subject_
+ << monitor::message("/consume_time") % consume_time
+ << monitor::message("/profiler/time") % consume_time % (1.0 / format_desc.fps);
+ });
}
std::wstring print() const
std::future<boost::property_tree::wptree> info()
{
return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
- {
+ {
boost::property_tree::wptree info;
for (auto& port : ports_)
{
info.add_child(L"consumers.consumer", port.second.info())
- .add(L"index", port.first);
+ .add(L"index", port.first);
}
return info;
}, task_priority::high_priority));
}
+
+ std::future<boost::property_tree::wptree> delay_info()
+ {
+ return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
+ {
+ boost::property_tree::wptree info;
+
+ for (auto& port : ports_)
+ {
+ auto total_age =
+ port.second.presentation_frame_age_millis();
+ auto sendoff_age = send_to_consumers_delays_[port.first];
+ auto presentation_time = total_age - sendoff_age;
+
+ boost::property_tree::wptree child;
+ child.add(L"name", port.second.print());
+ child.add(L"age-at-arrival", sendoff_age);
+ child.add(L"presentation-time", presentation_time);
+ child.add(L"age-at-presentation", total_age);
+
+ info.add_child(L"consumer", child);
+ }
+ return info;
+ }, task_priority::high_priority));
+ }
+
+ std::vector<spl::shared_ptr<const frame_consumer>> get_consumers()
+ {
+ return executor_.invoke([=]
+ {
+ std::vector<spl::shared_ptr<const frame_consumer>> consumers;
+
+ for (auto& port : ports_)
+ consumers.push_back(port.second.consumer());
+
+ return consumers;
+ });
+ }
};
-output::output(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_index)){}
+output::output(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_layout, channel_index)){}
void output::add(int index, const spl::shared_ptr<frame_consumer>& consumer){impl_->add(index, consumer);}
void output::add(const spl::shared_ptr<frame_consumer>& consumer){impl_->add(consumer);}
void output::remove(int index){impl_->remove(index);}
void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
std::future<boost::property_tree::wptree> output::info() const{return impl_->info();}
-void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);}
+std::future<boost::property_tree::wptree> output::delay_info() const{ return impl_->delay_info(); }
+std::vector<spl::shared_ptr<const frame_consumer>> output::get_consumers() const { return impl_->get_consumers(); }
+std::future<void> output::operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout){ return (*impl_)(std::move(frame), format_desc, channel_layout); }
monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
-}}
\ No newline at end of file
+}}