bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
+ int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();}
monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
+ int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();}
monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
+ int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();}
monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
+ int64_t presentation_frame_age_millis() const override {return consumer_->presentation_frame_age_millis();}
monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
std::wstring name() const override {return L"empty";}
bool has_synchronization_clock() const override {return false;}
int buffer_depth() const override {return 0;};
- virtual int index() const{return -1;}
+ int index() const override {return -1;}
+ int64_t presentation_frame_age_millis() const override {return -1;}
monitor::subject& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}
boost::property_tree::wptree info() const override
{
virtual bool has_synchronization_clock() const {return true;}
virtual int buffer_depth() const = 0; // -1 to not participate in frame presentation synchronization
virtual int index() const = 0;
+ virtual int64_t presentation_frame_age_millis() const = 0;
};
typedef std::function<spl::shared_ptr<frame_consumer>(
struct output::impl
{
spl::shared_ptr<diagnostics::graph> graph_;
- spl::shared_ptr<monitor::subject> monitor_subject_ = spl::make_shared<monitor::subject>("/output");
+ spl::shared_ptr<monitor::subject> monitor_subject_ = spl::make_shared<monitor::subject>("/output");
const int channel_index_;
video_format_desc format_desc_;
std::map<int, port> ports_;
prec_timer sync_timer_;
boost::circular_buffer<const_frame> frames_;
- executor executor_ = { L"output " + boost::lexical_cast<std::wstring>(channel_index_) };
+ std::map<int, int64_t> send_to_consumers_delays_;
+ executor executor_ { L"output " + boost::lexical_cast<std::wstring>(channel_index_) };
public:
impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index)
: graph_(std::move(graph))
executor_.begin_invoke([=]
{
auto it = ports_.find(index);
- if(it != ports_.end())
- ports_.erase(it);
+ if (it != ports_.end())
+ {
+ ports_.erase(it);
+ send_to_consumers_delays_.erase(index);
+ }
}, task_priority::high_priority);
}
catch(...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
+ send_to_consumers_delays_.erase(it->first);
ports_.erase(it++);
}
}
executor_.invoke([=]
{
-
if(!has_synchronization_clock())
sync_timer_.tick(1.0/format_desc_.fps);
auto& port = it->second;
auto depth = port.buffer_depth();
auto& frame = depth < 0 ? frames_.back() : frames_.at(depth - minmax.first);
+
+ send_to_consumers_delays_[it->first] = frame.get_age_millis();
try
{
{
CASPAR_LOG_CURRENT_EXCEPTION();
CASPAR_LOG(error) << "Failed to recover consumer: " << port.print() << L". Removing it.";
+ send_to_consumers_delays_.erase(it->first);
it = ports_.erase(it);
}
}
try
{
if (!it->second.get())
+ {
+ send_to_consumers_delays_.erase(it->first);
ports_.erase(it->first);
+ }
}
catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
+ send_to_consumers_delays_.erase(it->first);
ports_.erase(it->first);
}
}
return info;
}, task_priority::high_priority));
}
+
+ std::future<boost::property_tree::wptree> delay_info()
+ {
+ return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
+ {
+ boost::property_tree::wptree info;
+
+ for (auto& port : ports_)
+ {
+ auto total_age =
+ port.second.presentation_frame_age_millis();
+ auto sendoff_age = send_to_consumers_delays_[port.first];
+ auto presentation_time = total_age - sendoff_age;
+
+ boost::property_tree::wptree child;
+ child.add(L"name", port.second.print());
+ child.add(L"age-at-arrival", sendoff_age);
+ child.add(L"presentation-time", presentation_time);
+ child.add(L"age-at-presentation", total_age);
+
+ info.add_child(L"consumer", child);
+ }
+ return info;
+ }, task_priority::high_priority));
+ }
};
output::output(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_index)){}
void output::remove(int index){impl_->remove(index);}
void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
std::future<boost::property_tree::wptree> output::info() const{return impl_->info();}
-void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);}
+std::future<boost::property_tree::wptree> output::delay_info() const{ return impl_->delay_info(); }
+void output::operator()(const_frame frame, const video_format_desc& format_desc){ (*impl_)(std::move(frame), format_desc); }
monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
}}
// Properties
std::future<boost::property_tree::wptree> info() const;
+ std::future<boost::property_tree::wptree> delay_info() const;
private:
struct impl;
{
return consumer_->info();
}
+
+ int64_t presentation_frame_age_millis() const
+ {
+ return consumer_->presentation_frame_age_millis();
+ }
};
port::port(int index, int channel_index, spl::shared_ptr<frame_consumer> consumer) : impl_(new impl(index, channel_index, std::move(consumer))){}
std::wstring port::print() const{ return impl_->print();}
bool port::has_synchronization_clock() const{return impl_->has_synchronization_clock();}
boost::property_tree::wptree port::info() const{return impl_->info();}
+int64_t port::presentation_frame_age_millis() const{ return impl_->presentation_frame_age_millis(); }
}}
\ No newline at end of file
int buffer_depth() const;
bool has_synchronization_clock() const;
boost::property_tree::wptree info() const;
+ int64_t presentation_frame_age_millis() const;
private:
struct impl;
std::unique_ptr<impl> impl_;
#include "../StdAfx.h"
#include "draw_frame.h"
-
#include "frame.h"
-
#include "frame_transform.h"
namespace caspar { namespace core {
{
std::shared_ptr<const_frame> frame_;
std::vector<draw_frame> frames_;
- core::frame_transform frame_transform_;
+ core::frame_transform frame_transform_;
public:
impl()
frame_ == other.frame_ &&
frame_transform_ == other.frame_transform_;
}
+
+ int64_t get_and_record_age_millis(const draw_frame& self)
+ {
+ int64_t result = 0;
+
+ for (auto& frame : frames_)
+ {
+ if (frame != self)
+ result = std::max(result, frame.get_and_record_age_millis());
+ }
+
+ if (frame_)
+ result = std::max(result, frame_->get_age_millis());
+
+ return result;
+ }
};
draw_frame::draw_frame() : impl_(new impl()){}
const core::frame_transform& draw_frame::transform() const { return impl_->frame_transform_;}
core::frame_transform& draw_frame::transform() { return impl_->frame_transform_;}
void draw_frame::accept(frame_visitor& visitor) const{impl_->accept(visitor);}
+int64_t draw_frame::get_and_record_age_millis() { return impl_->get_and_record_age_millis(*this); }
bool draw_frame::operator==(const draw_frame& other)const{return *impl_ == *other.impl_;}
bool draw_frame::operator!=(const draw_frame& other)const{return !(*this == other);}
void swap(draw_frame& other);
void accept(frame_visitor& visitor) const;
-
+
+ int64_t get_and_record_age_millis();
+
bool operator==(const draw_frame& other) const;
bool operator!=(const draw_frame& other) const;
#include <common/except.h>
#include <common/array.h>
#include <common/future.h>
+#include <common/timer.h>
#include <core/frame/frame_visitor.h>
#include <core/frame/pixel_format.h>
core::audio_buffer audio_data_;
const core::pixel_format_desc desc_;
const void* tag_;
- core::frame_geometry geometry_ = frame_geometry::get_default();
+ core::frame_geometry geometry_ = frame_geometry::get_default();
+ caspar::timer since_created_timer_;
impl(std::vector<array<std::uint8_t>> buffers, audio_buffer audio_buffer, const void* tag, const core::pixel_format_desc& desc)
: buffers_(std::move(buffers))
const void* mutable_frame::data_tag()const{return impl_.get();}
const frame_geometry& mutable_frame::geometry() const { return impl_->geometry_; }
void mutable_frame::set_geometry(const frame_geometry& g) { impl_->geometry_ = g; }
+caspar::timer mutable_frame::since_created() const { return impl_->since_created_timer_; }
const const_frame& const_frame::empty()
{
struct const_frame::impl : boost::noncopyable
{
mutable std::vector<std::shared_future<array<const std::uint8_t>>> future_buffers_;
- core::audio_buffer audio_data_;
- const core::pixel_format_desc desc_;
- const void* tag_;
- core::frame_geometry geometry_;
+ core::audio_buffer audio_data_;
+ const core::pixel_format_desc desc_;
+ const void* tag_;
+ core::frame_geometry geometry_;
+ caspar::timer since_created_timer_;
+ bool should_record_age_;
+ mutable tbb::atomic<int64_t> recorded_age_;
impl(const void* tag)
: desc_(core::pixel_format::invalid)
, tag_(tag)
, geometry_(frame_geometry::get_default())
+ , should_record_age_(true)
{
+ recorded_age_ = 0;
}
impl(std::shared_future<array<const std::uint8_t>> image, audio_buffer audio_buffer, const void* tag, const core::pixel_format_desc& desc)
, desc_(desc)
, tag_(tag)
, geometry_(frame_geometry::get_default())
+ , should_record_age_(false)
{
- if(desc.format != core::pixel_format::bgra)
+ if (desc.format != core::pixel_format::bgra)
CASPAR_THROW_EXCEPTION(not_implemented());
future_buffers_.push_back(std::move(image));
, desc_(other.pixel_format_desc())
, tag_(other.stream_tag())
, geometry_(other.geometry())
+ , since_created_timer_(other.since_created())
+ , should_record_age_(true)
{
- for(std::size_t n = 0; n < desc_.planes.size(); ++n)
+ for (std::size_t n = 0; n < desc_.planes.size(); ++n)
{
future_buffers_.push_back(make_ready_future<array<const std::uint8_t>>(std::move(other.image_data(n))).share());
}
+
+ recorded_age_ = -1;
}
array<const std::uint8_t> image_data(int index) const
{
return tag_ != empty().stream_tag() ? desc_.planes.at(0).size : 0;
}
+
+ int64_t get_age_millis() const
+ {
+ if (should_record_age_)
+ {
+ if (recorded_age_ == -1)
+ recorded_age_ = static_cast<int64_t>(since_created_timer_.elapsed() * 1000.0);
+
+ return recorded_age_;
+ }
+ else
+ return static_cast<int64_t>(since_created_timer_.elapsed() * 1000.0);
+ }
};
const_frame::const_frame(const void* tag) : impl_(new impl(tag)){}
}
bool const_frame::operator==(const const_frame& other){return impl_ == other.impl_;}
bool const_frame::operator!=(const const_frame& other){return !(*this == other);}
-bool const_frame::operator<(const const_frame& other){return impl_< other.impl_;}
-bool const_frame::operator>(const const_frame& other){return impl_> other.impl_;}
+bool const_frame::operator<(const const_frame& other){return impl_ < other.impl_;}
+bool const_frame::operator>(const const_frame& other){return impl_ > other.impl_;}
const core::pixel_format_desc& const_frame::pixel_format_desc()const{return impl_->desc_;}
array<const std::uint8_t> const_frame::image_data(int index)const{return impl_->image_data(index);}
const core::audio_buffer& const_frame::audio_data()const{return impl_->audio_data_;}
const void* const_frame::data_tag()const{return impl_.get();}
const frame_geometry& const_frame::geometry() const { return impl_->geometry_; }
void const_frame::set_geometry(const frame_geometry& g) { impl_->geometry_ = g; }
+int64_t const_frame::get_age_millis() const { return impl_->get_age_millis(); }
}}
#include <common/array.h>
#include <common/future_fwd.h>
#include <common/cache_aligned_vector.h>
+#include <common/timer.h>
#include <cstddef>
#include <cstdint>
const core::frame_geometry& geometry() const;
void set_geometry(const frame_geometry& g);
+
+ caspar::timer since_created() const;
private:
struct impl;
const core::frame_geometry& geometry() const;
void set_geometry(const frame_geometry& g);
+ int64_t get_age_millis() const;
bool operator==(const const_frame& other);
bool operator!=(const const_frame& other);
#include <tbb/concurrent_queue.h>
#include <tbb/spin_mutex.h>
+#include <tbb/atomic.h>
#include <unordered_map>
#include <vector>
{
int channel_index_;
spl::shared_ptr<diagnostics::graph> graph_;
+ tbb::atomic<int64_t> current_mix_time_;
audio_mixer audio_mixer_;
spl::shared_ptr<image_mixer> image_mixer_;
, image_mixer_(std::move(image_mixer))
{
graph_->set_color("mix-time", diagnostics::color(1.0f, 0.0f, 0.9f, 0.8f));
+ current_mix_time_ = 0;
}
const_frame operator()(std::map<int, draw_frame> frames, const video_format_desc& format_desc)
return const_frame::empty();
}
});
-
- graph_->set_value("mix-time", frame_timer.elapsed()*format_desc.fps*0.5);
+
+ auto mix_time = frame_timer.elapsed();
+ graph_->set_value("mix-time", mix_time * format_desc.fps * 0.5);
+ current_mix_time_ = static_cast<int64_t>(mix_time * 1000.0);
return frame;
}
std::future<boost::property_tree::wptree> info() const
{
- return make_ready_future(boost::property_tree::wptree());
+ boost::property_tree::wptree info;
+ info.add(L"mix-time", current_mix_time_);
+
+ return make_ready_future(std::move(info));
+ }
+
+ std::future<boost::property_tree::wptree> delay_info() const
+ {
+ boost::property_tree::wptree info;
+ info.put_value(current_mix_time_);
+
+ return make_ready_future(std::move(info));
}
};
void mixer::set_master_volume(float volume) { impl_->set_master_volume(volume); }
float mixer::get_master_volume() { return impl_->get_master_volume(); }
std::future<boost::property_tree::wptree> mixer::info() const{return impl_->info();}
-const_frame mixer::operator()(std::map<int, draw_frame> frames, const video_format_desc& format_desc){return (*impl_)(std::move(frames), format_desc);}
+std::future<boost::property_tree::wptree> mixer::delay_info() const{ return impl_->delay_info(); }
+const_frame mixer::operator()(std::map<int, draw_frame> frames, const video_format_desc& format_desc){ return (*impl_)(std::move(frames), format_desc); }
mutable_frame mixer::create_frame(const void* tag, const core::pixel_format_desc& desc) {return impl_->image_mixer_->create_frame(tag, desc);}
}}
// Properties
std::future<boost::property_tree::wptree> info() const;
+ std::future<boost::property_tree::wptree> delay_info() const;
private:
struct impl;
spl::shared_ptr<frame_producer> background_ = frame_producer::empty();;
boost::optional<int32_t> auto_play_delta_;
bool is_paused_ = false;
+ int64_t current_frame_age_ = 0;
public:
impl(int index)
//foreground_event_subject_ << monitor::event("type") % foreground_->name();
//background_event_subject_ << monitor::event("type") % background_->name();
+
+ current_frame_age_ = frame.get_and_record_age_millis();
return frame;
}
info.add(L"nb_frames", nb_frames == std::numeric_limits<int64_t>::max() ? -1 : nb_frames);
info.add(L"frames-left", nb_frames == std::numeric_limits<int64_t>::max() ? -1 : (foreground_->nb_frames() - foreground_->frame_number() - (auto_play_delta_ ? *auto_play_delta_ : 0)));
+ info.add(L"frame-age", current_frame_age_);
info.add_child(L"producer", foreground_->info());
info.add_child(L"background.producer", background_->info());
return info;
}
+ boost::property_tree::wptree delay_info() const
+ {
+ boost::property_tree::wptree info;
+ info.add(L"producer", foreground_->print());
+ info.add(L"frame-age", current_frame_age_);
+ return info;
+ }
+
void on_interaction(const interaction_event::ptr& event)
{
foreground_->on_interaction(event);
spl::shared_ptr<frame_producer> layer::foreground() const { return impl_->foreground_;}
spl::shared_ptr<frame_producer> layer::background() const { return impl_->background_;}
boost::property_tree::wptree layer::info() const{return impl_->info();}
+boost::property_tree::wptree layer::delay_info() const{return impl_->delay_info();}
monitor::subject& layer::monitor_output() {return *impl_->monitor_subject_;}
void layer::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); }
bool layer::collides(double x, double y) const { return impl_->collides(x, y); }
spl::shared_ptr<frame_producer> background() const;
boost::property_tree::wptree info() const;
+ boost::property_tree::wptree delay_info() const;
private:
struct impl;
{
return get_layer(index).info();
}, task_priority::high_priority);
- }
+ }
+
+ std::future<boost::property_tree::wptree> delay_info()
+ {
+ return std::move(executor_.begin_invoke([this]() -> boost::property_tree::wptree
+ {
+ boost::property_tree::wptree info;
+
+ for (auto& layer : layers_)
+ info.add_child(L"layer", layer.second.delay_info()).add(L"index", layer.first);
+
+ return info;
+ }, task_priority::high_priority));
+ }
+
+ std::future<boost::property_tree::wptree> delay_info(int index)
+ {
+ return std::move(executor_.begin_invoke([=]() -> boost::property_tree::wptree
+ {
+ return get_layer(index).delay_info();
+ }, task_priority::high_priority));
+ }
std::future<std::wstring> call(int index, const std::vector<std::wstring>& params)
{
std::future<std::shared_ptr<frame_producer>> stage::background(int index) { return impl_->background(index); }
std::future<boost::property_tree::wptree> stage::info() const{ return impl_->info(); }
std::future<boost::property_tree::wptree> stage::info(int index) const{ return impl_->info(index); }
-std::map<int, draw_frame> stage::operator()(const video_format_desc& format_desc){return (*impl_)(format_desc);}
+std::future<boost::property_tree::wptree> stage::delay_info() const{ return impl_->delay_info(); }
+std::future<boost::property_tree::wptree> stage::delay_info(int index) const{ return impl_->delay_info(index); }
+std::map<int, draw_frame> stage::operator()(const video_format_desc& format_desc){ return (*impl_)(format_desc); }
monitor::subject& stage::monitor_output(){return *impl_->monitor_subject_;}
-//void stage::subscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.subscribe(o);}
-//void stage::unsubscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.unsubscribe(o);}
void stage::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); }
}}
std::future<boost::property_tree::wptree> info() const;
std::future<boost::property_tree::wptree> info(int index) const;
+ std::future<boost::property_tree::wptree> delay_info() const;
+ std::future<boost::property_tree::wptree> delay_info(int layer) const;
private:
struct impl;
spl::shared_ptr<impl> impl_;
return info;
}
+
+ boost::property_tree::wptree delay_info() const
+ {
+ boost::property_tree::wptree info;
+
+ auto stage_info = stage_.delay_info();
+ auto mixer_info = mixer_.delay_info();
+ auto output_info = output_.delay_info();
+
+ // TODO: because of std::async deferred timed waiting does not work so for now we have to block
+ info.add_child(L"layers", stage_info.get());
+ info.add_child(L"mix-time", mixer_info.get());
+ info.add_child(L"output", output_info.get());
+
+ return info;
+ }
};
video_channel::video_channel(int index, const core::video_format_desc& format_desc, std::unique_ptr<image_mixer> image_mixer) : impl_(new impl(index, format_desc, std::move(image_mixer))){}
core::video_format_desc video_channel::video_format_desc() const{return impl_->video_format_desc();}
void core::video_channel::video_format_desc(const core::video_format_desc& format_desc){impl_->video_format_desc(format_desc);}
boost::property_tree::wptree video_channel::info() const{return impl_->info();}
+boost::property_tree::wptree video_channel::delay_info() const { return impl_->delay_info(); }
int video_channel::index() const { return impl_->index(); }
monitor::subject& video_channel::monitor_output(){ return *impl_->monitor_subject_; }
spl::shared_ptr<core::frame_factory> frame_factory();
boost::property_tree::wptree info() const;
+ boost::property_tree::wptree delay_info() const;
int index() const;
private:
struct impl;
#include <core/mixer/audio/audio_util.h>
#include <tbb/concurrent_queue.h>
+#include <tbb/atomic.h>
#include <common/assert.h>
#include <boost/lexical_cast.hpp>
struct bluefish_consumer : boost::noncopyable
{
- spl::shared_ptr<CBlueVelvet4> blue_;
+ spl::shared_ptr<CBlueVelvet4> blue_;
const unsigned int device_index_;
const core::video_format_desc format_desc_;
const int channel_index_;
const std::wstring model_name_;
- spl::shared_ptr<diagnostics::graph> graph_;
+ spl::shared_ptr<diagnostics::graph> graph_;
boost::timer frame_timer_;
boost::timer tick_timer_;
boost::timer sync_timer_;
std::array<blue_dma_buffer_ptr, 4> reserved_frames_;
tbb::concurrent_bounded_queue<core::const_frame> frame_buffer_;
-
+ tbb::atomic<int64_t> presentation_delay_millis_;
+ core::const_frame previous_frame_ = core::const_frame::empty();
+
const bool embedded_audio_;
const bool key_only_;
, executor_(print())
{
executor_.set_capacity(1);
+ presentation_delay_millis_ = 0;
graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));
graph_->set_color("sync-time", diagnostics::color(1.0f, 0.0f, 0.0f));
frame_timer_.restart();
+ if (previous_frame_ != core::const_frame::empty())
+ presentation_delay_millis_ = previous_frame_.get_age_millis();
+
+ previous_frame_ = frame;
+
// Copy to local buffers
if(!frame.image_data().empty())
return model_name_ + L" [" + boost::lexical_cast<std::wstring>(channel_index_) + L"-" +
boost::lexical_cast<std::wstring>(device_index_) + L"|" + format_desc_.name + L"]";
}
+
+ int64_t presentation_delay_millis() const
+ {
+ return presentation_delay_millis_;
+ }
};
struct bluefish_consumer_proxy : public core::frame_consumer
info.add(L"key-only", key_only_);
info.add(L"device", device_index_);
info.add(L"embedded-audio", embedded_audio_);
+ info.add(L"presentation-frame-age", presentation_frame_age_millis());
return info;
}
return 400 + device_index_;
}
+ int64_t presentation_frame_age_millis() const override
+ {
+ return consumer_ ? consumer_->presentation_delay_millis() : 0;
+ }
+
core::monitor::subject& monitor_output()
{
return monitor_subject_;
{
return frame_.audio_data();
}
+
+ int64_t get_age_millis() const
+ {
+ return frame_.get_age_millis();
+ }
};
struct decklink_consumer : public IDeckLinkVideoOutputCallback, public IDeckLinkAudioOutputCallback, boost::noncopyable
tbb::concurrent_bounded_queue<core::const_frame> audio_frame_buffer_;
spl::shared_ptr<diagnostics::graph> graph_;
+ tbb::atomic<int64_t> current_presentation_delay_;
caspar::timer tick_timer_;
retry_task<bool> send_completion_;
, format_desc_(format_desc)
{
is_running_ = true;
+ current_presentation_delay_ = 0;
video_frame_buffer_.set_capacity(1);
try
{
+ auto dframe = reinterpret_cast<decklink_frame*>(completed_frame);
+ current_presentation_delay_ = dframe->get_age_millis();
if(result == bmdOutputFrameDisplayedLate)
{
graph_->set_tag("late-frame");
video_scheduled_ += format_desc_.duration;
- audio_scheduled_ += reinterpret_cast<decklink_frame*>(completed_frame)->audio_data().size()/format_desc_.audio_channels;
+ audio_scheduled_ += dframe->audio_data().size() / format_desc_.audio_channels;
//++video_scheduled_;
//audio_scheduled_ += format_desc_.audio_cadence[0];
//++audio_scheduled_;
core::monitor::subject monitor_subject_;
const configuration config_;
std::unique_ptr<decklink_consumer> consumer_;
+ core::video_format_desc format_desc_;
executor executor_;
public:
void initialize(const core::video_format_desc& format_desc, int channel_index) override
{
+ format_desc_ = format_desc;
executor_.invoke([=]
{
consumer_.reset();
info.add(L"device", config_.device_index);
info.add(L"low-latency", config_.latency == configuration::latency_t::low_latency);
info.add(L"embedded-audio", config_.embedded_audio);
+ info.add(L"presentation-frame-age", presentation_frame_age_millis());
//info.add(L"internal-key", config_.internal_key);
return info;
}
int buffer_depth() const override
{
- return config_.buffer_depth();
+ return config_.buffer_depth() + 2;
}
int index() const override
return 300 + config_.device_index;
}
+ int64_t presentation_frame_age_millis() const override
+ {
+ return consumer_ ? consumer_->current_presentation_delay_ : 0;
+ }
+
core::monitor::subject& monitor_output()
{
return monitor_subject_;
output_format output_format_;
bool key_only_;
-
+ tbb::atomic<int64_t> current_encoding_delay_;
+
executor executor_;
public:
ffmpeg_consumer(const std::string& filename, const core::video_format_desc& format_desc, std::vector<option> options, bool key_only)
, key_only_(key_only)
, executor_(print())
{
+ current_encoding_delay_ = 0;
check_space();
// TODO: Ask stakeholders about case where file already exists.
executor_.begin_invoke([=]
{
encode(frame);
+ current_encoding_delay_ = frame.get_age_millis();
});
}
{
}
- virtual void initialize(const core::video_format_desc& format_desc, int)
+ void initialize(const core::video_format_desc& format_desc, int) override
{
if(consumer_)
BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer."));
key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), format_desc, options_, true));
}
}
-
+
+ int64_t presentation_frame_age_millis() const override
+ {
+ return consumer_ ? consumer_->current_encoding_delay_ : 0;
+ }
+
std::future<bool> send(core::const_frame frame) override
{
bool ready_for_frame = consumer_->ready_for_frame();
tbb::atomic<int> tokens_;
boost::mutex tokens_mutex_;
boost::condition_variable tokens_cond_;
+ tbb::atomic<int64_t> current_encoding_delay_;
executor write_executor_;
, video_encoder_executor_(print() + L" video_encoder")
, write_executor_(print() + L" io")
{
- abort_request_ = false;
+ abort_request_ = false;
+ current_encoding_delay_ = 0;
for(auto it =
boost::sregex_iterator(
--tokens_;
std::shared_ptr<void> token(
nullptr,
- [this](void*)
+ [this, frame](void*)
{
++tokens_;
tokens_cond_.notify_one();
+ current_encoding_delay_ = frame.get_age_millis();
});
return executor_.begin_invoke([=]() -> bool
return 100000 + consumer_index_offset_;
}
+ int64_t presentation_frame_age_millis() const override
+ {
+ return current_encoding_delay_;
+ }
+
private:
static int interrupt_cb(void* ctx)
void initialize(const core::video_format_desc&, int) override
{
}
-
+
+ int64_t presentation_frame_age_millis() const override
+ {
+ return 0;
+ }
+
std::future<bool> send(core::const_frame frame) override
{
auto filename = filename_;
return 900;
}
+ virtual int64_t presentation_frame_age_millis() const override
+ {
+ return 0;
+ }
+
virtual bool has_synchronization_clock() const override
{
return provide_sync_ && connected_;
spl::shared_ptr<diagnostics::graph> graph_;
boost::timer perf_timer_;
- int channel_index_ = -1;
+ tbb::atomic<int64_t> presentation_age_;
+ int channel_index_ = -1;
core::video_format_desc format_desc_;
oal_consumer()
{
buffers_.fill(0);
+ presentation_age_ = 0;
init_device();
alSourcePlay(source_);
});
}
-
+
+ int64_t presentation_frame_age_millis() const override
+ {
+ return presentation_age_;
+ }
+
std::future<bool> send(core::const_frame frame) override
{
// Will only block if the default executor queue capacity of 512 is
graph_->set_value("tick-time", perf_timer_.elapsed()*format_desc_.fps*0.5);
perf_timer_.restart();
+ presentation_age_ = frame.get_age_millis() + delay_millis();
});
return make_ready_future(true);
int delay_millis() const
{
- return 60;
+ return 160;
}
int buffer_depth() const override
return L"channel-consumer";
}
+ int64_t presentation_frame_age_millis() const override
+ {
+ return current_age_;
+ }
+
std::wstring print() const override
{
return L"[channel-consumer|" + boost::lexical_cast<std::wstring>(channel_index_) + L"]";
if (!is_running_)
return frame;
- frame_buffer_.try_pop(frame);
- //if (frame_buffer_.try_pop(frame))
-// current_age_ = frame.get_age_millis();
+ if (frame_buffer_.try_pop(frame))
+ current_age_ = frame.get_age_millis();
return frame;
}
boost::thread thread_;
tbb::atomic<bool> is_running_;
+ tbb::atomic<int64_t> current_presentation_age_;
ffmpeg::filter filter_;
public:
screen_height_ = square_height_;
is_running_ = true;
+ current_presentation_age_ = 0;
thread_ = boost::thread([this]{run();});
}
window_.Display();*/
+ current_presentation_age_ = frame.get_age_millis();
graph_->set_value("tick-time", tick_timer_.elapsed()*format_desc_.fps*0.5);
tick_timer_.restart();
}
consumer_.reset();
consumer_.reset(new screen_consumer(config_, format_desc, channel_index, sink_));
}
-
+
+ int64_t presentation_frame_age_millis() const override
+ {
+ return consumer_ ? consumer_->current_presentation_age_ : 0;
+ }
+
std::future<bool> send(core::const_frame frame) override
{
return consumer_->send(frame);
info.add(L"key-only", config_.key_only);
info.add(L"windowed", config_.windowed);
info.add(L"auto-deinterlace", config_.auto_deinterlace);
+ info.add(L"vsync", config_.vsync);
return info;
}
return replyString.str();
}
+void info_delay_describer(core::help_sink& sink, const core::help_repository& repo)
+{
+ sink.short_description(L"Get the current delay on a channel or a layer.");
+ sink.syntax(L"INFO [video_channel:int]{-[layer:int]} DELAY");
+ sink.para()->text(L"Get the current delay on the specified channel or layer.");
+}
+
+std::wstring info_delay_command(command_context& ctx)
+{
+ boost::property_tree::wptree info;
+ auto layer = ctx.layer_index(std::numeric_limits<int>::min());
+
+ if (layer == std::numeric_limits<int>::min())
+ info.add_child(L"channel-delay", ctx.channel.channel->delay_info());
+ else
+ info.add_child(L"layer-delay", ctx.channel.channel->stage().delay_info(layer).get())
+ .add(L"index", layer);
+
+ return create_info_xml_reply(info, L"DELAY");
+}
+
void diag_describer(core::help_sink& sink, const core::help_repository& repo)
{
sink.short_description(L"Open the diagnostics window.");
repo.register_command( L"Query Commands", L"INFO SERVER", info_server_describer, info_server_command, 0);
repo.register_command( L"Query Commands", L"INFO QUEUES", info_queues_describer, info_queues_command, 0);
repo.register_command( L"Query Commands", L"INFO THREADS", info_threads_describer, info_threads_command, 0);
+ repo.register_channel_command( L"Query Commands", L"INFO DELAY", info_delay_describer, info_delay_command, 0);
repo.register_command( L"Query Commands", L"DIAG", diag_describer, diag_command, 0);
repo.register_command( L"Query Commands", L"BYE", bye_describer, bye_command, 0);
repo.register_command( L"Query Commands", L"KILL", kill_describer, kill_command, 0);