1 #include "../StdAfx.h"
\r
4 #pragma warning (disable : 4244)
\r
7 #include "frame_consumer_device.h"
\r
9 #include "../video_format.h"
\r
11 #include <common/concurrency/executor.h>
\r
12 #include <common/utility/timer.h>
\r
14 #include <boost/range/algorithm_ext/erase.hpp>
\r
15 #include <boost/range/algorithm.hpp>
\r
17 namespace caspar { namespace core {
\r
19 struct frame_consumer_device::implementation
\r
22 executor executor_;
\r
25 std::deque<safe_ptr<const read_frame>> buffer_;
\r
27 std::vector<safe_ptr<frame_consumer>> consumers_;
\r
29 const video_format_desc fmt_;
\r
32 implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers)
\r
33 : consumers_(consumers)
\r
36 if(consumers_.empty())
\r
37 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("No consumer."));
\r
39 std::vector<size_t> depths;
\r
40 boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));
\r
41 depth_ = *boost::range::max_element(depths);
\r
42 executor_.set_capacity(3);
\r
46 void consume(safe_ptr<const read_frame>&& frame)
\r
48 executor_.begin_invoke([=]
\r
50 buffer_.push_back(frame);
\r
52 if(buffer_.size() < depth_)
\r
55 boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)
\r
59 consumer->send(buffer_[consumer->buffer_depth()-1]);
\r
63 CASPAR_LOG_CURRENT_EXCEPTION();
\r
64 boost::range::remove_erase(consumers_, consumer);
\r
65 CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";
\r
69 buffer_.pop_front();
\r
71 clock_.wait(fmt_.fps);
\r
76 frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}
\r
77 frame_consumer_device::frame_consumer_device(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) : impl_(new implementation(format_desc, consumers)){}
\r
78 void frame_consumer_device::consume(safe_ptr<const read_frame>&& future_frame) { impl_->consume(std::move(future_frame)); }
\r