.any();
}
- void operator()(const_frame input_frame, const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout)
+ std::future<void> operator()(const_frame input_frame, const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout)
{
- caspar::timer frame_timer;
+ spl::shared_ptr<caspar::timer> frame_timer;
change_channel_format(format_desc, channel_layout);
- executor_.invoke([=]
- {
- if(!has_synchronization_clock())
- sync_timer_.tick(1.0/format_desc_.fps);
-
- if(input_frame.size() != format_desc_.size)
+ auto pending_send_results = executor_.invoke([=]() -> std::shared_ptr<std::map<int, std::future<bool>>>
+ {
+ if (input_frame.size() != format_desc_.size)
{
CASPAR_LOG(debug) << print() << L" Invalid input frame dimension.";
- return;
+ return nullptr;
}
auto minmax = minmax_buffer_depth();
frames_.set_capacity(std::max(2, minmax.second - minmax.first) + 1); // std::max(2, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back.
frames_.push_back(input_frame);
- if(!frames_.full())
- return;
+ if (!frames_.full())
+ return nullptr;
- std::map<int, std::future<bool>> send_results;
+ spl::shared_ptr<std::map<int, std::future<bool>>> send_results;
// Start invocations
for (auto it = ports_.begin(); it != ports_.end();)
{
- auto& port = it->second;
+ auto& port = it->second;
auto depth = port.buffer_depth();
auto& frame = depth < 0 ? frames_.back() : frames_.at(depth - minmax.first);
send_to_consumers_delays_[it->first] = frame.get_age_millis();
-
+
try
{
- send_results.insert(std::make_pair(it->first, port.send(frame)));
+ send_results->insert(std::make_pair(it->first, port.send(frame)));
++it;
}
catch (...)
CASPAR_LOG_CURRENT_EXCEPTION();
try
{
- send_results.insert(std::make_pair(it->first, port.send(frame)));
+ send_results->insert(std::make_pair(it->first, port.send(frame)));
++it;
}
- catch(...)
+ catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
CASPAR_LOG(error) << "Failed to recover consumer: " << port.print() << L". Removing it.";
}
}
+ return send_results;
+ });
+
+ if (!pending_send_results)
+ return make_ready_future();
+
+ return executor_.begin_invoke([=]()
+ {
// Retrieve results
- for (auto it = send_results.begin(); it != send_results.end(); ++it)
+ for (auto it = pending_send_results->begin(); it != pending_send_results->end(); ++it)
{
try
{
ports_.erase(it->first);
}
}
- });
- auto consume_time = frame_timer.elapsed();
- graph_->set_value("consume-time", consume_time * format_desc.fps * 0.5);
- *monitor_subject_
- << monitor::message("/consume_time") % consume_time
- << monitor::message("/profiler/time") % consume_time % (1.0 / format_desc.fps);
+ if (!has_synchronization_clock())
+ sync_timer_.tick(1.0 / format_desc_.fps);
+
+ auto consume_time = frame_timer->elapsed();
+ graph_->set_value("consume-time", consume_time * format_desc.fps * 0.5);
+ *monitor_subject_
+ << monitor::message("/consume_time") % consume_time
+ << monitor::message("/profiler/time") % consume_time % (1.0 / format_desc.fps);
+ });
}
std::wstring print() const
void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
std::future<boost::property_tree::wptree> output::info() const{return impl_->info();}
std::future<boost::property_tree::wptree> output::delay_info() const{ return impl_->delay_info(); }
-void output::operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout){ (*impl_)(std::move(frame), format_desc, channel_layout); }
+std::future<void> output::operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout){ return (*impl_)(std::move(frame), format_desc, channel_layout); }
monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
}}
#include <boost/property_tree/ptree_fwd.hpp>
+#include <future>
+
FORWARD2(caspar, diagnostics, class graph);
namespace caspar { namespace core {
// Methods
- void operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout);
+ // Returns when submitted to consumers, but the future indicates when the consumers are ready for a new frame.
+ std::future<void> operator()(const_frame frame, const video_format_desc& format_desc, const core::audio_channel_layout& channel_layout);
void add(const spl::shared_ptr<frame_consumer>& consumer);
void add(int index, const spl::shared_ptr<frame_consumer>& consumer);
#include <common/lock.h>
#include <common/executor.h>
#include <common/timer.h>
+#include <common/future.h>
#include <core/mixer/image/image_mixer.h>
#include <core/diagnostics/call_context.h>
mutable tbb::spin_mutex channel_layout_mutex_;
core::audio_channel_layout channel_layout_;
- const spl::shared_ptr<caspar::diagnostics::graph> graph_ = [](int index)
- {
- core::diagnostics::scoped_call_context save;
- core::diagnostics::call_context::for_thread().video_channel = index;
- return spl::make_shared<caspar::diagnostics::graph>();
- }(index_);
+ const spl::shared_ptr<caspar::diagnostics::graph> graph_ = [](int index)
+ {
+ core::diagnostics::scoped_call_context save;
+ core::diagnostics::call_context::for_thread().video_channel = index;
+ return spl::make_shared<caspar::diagnostics::graph>();
+ }(index_);
caspar::core::output output_;
+ std::future<void> output_ready_for_frame_ = make_ready_future();
spl::shared_ptr<image_mixer> image_mixer_;
caspar::core::mixer mixer_;
caspar::core::stage stage_;
- executor executor_ { L"video_channel " + boost::lexical_cast<std::wstring>(index_) };
+ executor executor_ { L"video_channel " + boost::lexical_cast<std::wstring>(index_) };
public:
impl(
int index,
auto mixed_frame = mixer_(std::move(stage_frames), format_desc, channel_layout);
// Consume
-
- output_(std::move(mixed_frame), format_desc, channel_layout);
+
+ output_ready_for_frame_.get();
+ output_ready_for_frame_ = output_(std::move(mixed_frame), format_desc, channel_layout);
auto frame_time = frame_timer.elapsed()*format_desc.fps*0.5;
graph_->set_value("tick-time", frame_time);