\r
#include "frame_consumer_device.h"\r
\r
-#include "../format/video_format.h"\r
+#include "../video_format.h"\r
\r
#include <common/concurrency/executor.h>\r
#include <common/utility/timer.h>\r
\r
struct frame_consumer_device::implementation\r
{\r
+ timer clock_;\r
+ executor executor_; \r
+\r
+ size_t depth_;\r
+ std::deque<safe_ptr<const read_frame>> buffer_; \r
+\r
+ std::vector<safe_ptr<frame_consumer>> consumers_;\r
+ \r
+ const video_format_desc fmt_;\r
+\r
public:\r
- implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) : consumers_(consumers), fmt_(format_desc)\r
+ implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) \r
+ : consumers_(consumers)\r
+ , fmt_(format_desc)\r
{ \r
+ if(consumers_.empty())\r
+ BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("No consumer."));\r
+\r
std::vector<size_t> depths;\r
boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));\r
- max_depth_ = *boost::range::max_element(depths);\r
+ depth_ = *boost::range::max_element(depths);\r
executor_.set_capacity(3);\r
executor_.start();\r
}\r
- \r
- void tick(const safe_ptr<const read_frame>& frame)\r
- {\r
- buffer_.push_back(frame);\r
- \r
- boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
- {\r
- size_t offset = max_depth_ - consumer->buffer_depth();\r
- if(offset < buffer_.size())\r
- consumer->send(*(buffer_.begin() + offset));\r
- });\r
\r
- frame_consumer::sync_mode sync = frame_consumer::ready;\r
- boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
+ void consume(safe_ptr<const read_frame>&& frame)\r
+ { \r
+ executor_.begin_invoke([=]\r
{\r
- try\r
- {\r
- size_t offset = max_depth_ - consumer->buffer_depth();\r
- if(offset >= buffer_.size())\r
- return;\r
+ buffer_.push_back(frame);\r
\r
- if(consumer->synchronize() == frame_consumer::clock)\r
- sync = frame_consumer::clock;\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- boost::range::remove_erase(consumers_, consumer);\r
- CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
- }\r
- });\r
+ if(buffer_.size() < depth_)\r
+ return;\r
\r
- if(sync != frame_consumer::clock)\r
- clock_.wait();\r
+ boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
+ {\r
+ try\r
+ {\r
+ consumer->send(buffer_[consumer->buffer_depth()-1]);\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ boost::range::remove_erase(consumers_, consumer);\r
+ CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
+ }\r
+ });\r
\r
- if(buffer_.size() >= max_depth_)\r
buffer_.pop_front();\r
- }\r
-\r
- void consume(safe_ptr<const read_frame>&& frame)\r
- { \r
- executor_.begin_invoke([=]{tick(frame);});\r
- }\r
-\r
- timer clock_;\r
- executor executor_; \r
-\r
- size_t max_depth_;\r
- std::deque<safe_ptr<const read_frame>> buffer_; \r
-\r
- std::vector<safe_ptr<frame_consumer>> consumers_;\r
\r
- const video_format_desc& fmt_;\r
+ clock_.wait(fmt_.fps);\r
+ });\r
+ }\r
};\r
\r
frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r