#include <common/prec_timer.h>
#include <common/memshfl.h>
#include <common/env.h>
+#include <common/linq.h>
#include <boost/circular_buffer.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/property_tree/ptree.hpp>
-#include <boost/range/algorithm.hpp>
-#include <boost/range/adaptors.hpp>
#include <boost/timer.hpp>
+#include <functional>
+
namespace caspar { namespace core {
struct output::impl
{
spl::shared_ptr<diagnostics::graph> graph_;
- monitor::subject monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_ = spl::make_shared<monitor::subject>("/output");
const int channel_index_;
video_format_desc format_desc_;
std::map<int, port> ports_;
prec_timer sync_timer_;
boost::circular_buffer<const_frame> frames_;
- executor executor_;
+ executor executor_ = { L"output" };
public:
impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index)
: graph_(std::move(graph))
- , monitor_subject_("/output")
, channel_index_(channel_index)
, format_desc_(format_desc)
- , executor_(L"output")
{
- graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8));
+ graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8f));
}
void add(int index, spl::shared_ptr<frame_consumer> consumer)
executor_.begin_invoke([this, index, consumer]
{
port p(index, channel_index_, std::move(consumer));
- p.monitor_output().link_target(&monitor_subject_);
+ p.monitor_output().attach_parent(monitor_subject_);
ports_.insert(std::make_pair(index, std::move(p)));
}, task_priority::high_priority);
}
remove(consumer->index());
}
- void video_format_desc(const core::video_format_desc& format_desc)
+ void set_video_format_desc(const core::video_format_desc& format_desc)
{
executor_.invoke([&]
{
{
if(ports_.empty())
return std::make_pair(0, 0);
-
- auto buffer_depths = ports_ |
- boost::adaptors::map_values | // std::function is MSVC workaround
- boost::adaptors::transformed(std::function<int(const port&)>([](const port& p){return p.buffer_depth();}));
-
- return std::make_pair(*boost::range::min_element(buffer_depths), *boost::range::max_element(buffer_depths));
+ return cpplinq::from(ports_)
+ .select(values())
+ .select(std::mem_fn(&port::buffer_depth))
+ .aggregate(minmax::initial_value<int>(), minmax());
}
bool has_synchronization_clock() const
{
- return boost::range::count_if(ports_ | boost::adaptors::map_values,
- [](const port& p){return p.has_synchronization_clock();}) > 0;
+ return cpplinq::from(ports_)
+ .select(values())
+ .where(std::mem_fn(&port::has_synchronization_clock))
+ .any();
}
void operator()(const_frame input_frame, const core::video_format_desc& format_desc)
{
boost::timer frame_timer;
- video_format_desc(format_desc);
+ set_video_format_desc(format_desc);
executor_.invoke([=]
{
if(!frames_.full())
return;
- std::map<int, boost::unique_future<bool>> send_results;
+ std::map<int, std::future<bool>> send_results;
// Start invocations
for (auto it = ports_.begin(); it != ports_.end();)
return L"output[" + boost::lexical_cast<std::wstring>(channel_index_) + L"]";
}
- boost::unique_future<boost::property_tree::wptree> info()
+ std::future<boost::property_tree::wptree> info()
{
return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
{
boost::property_tree::wptree info;
- BOOST_FOREACH(auto& port, ports_)
+ for (auto& port : ports_)
{
info.add_child(L"consumers.consumer", port.second.info())
.add(L"index", port.first);
void output::add(const spl::shared_ptr<frame_consumer>& consumer){impl_->add(consumer);}
void output::remove(int index){impl_->remove(index);}
void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
-boost::unique_future<boost::property_tree::wptree> output::info() const{return impl_->info();}
+std::future<boost::property_tree::wptree> output::info() const{return impl_->info();}
void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);}
-monitor::source& output::monitor_output() {return impl_->monitor_subject_;}
-}}
\ No newline at end of file
+monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
+}}