\r
critical_section mutex_;\r
call<output::source_element_t> output_;\r
+\r
+ boost::circular_buffer<safe_ptr<read_frame>> frames_;\r
\r
public:\r
implementation(output::source_t& source, const video_format_desc& format_desc) \r
}\r
}\r
}\r
+\r
+ std::pair<size_t, size_t> minmax_buffer_depth() const\r
+ { \r
+ if(consumers_.empty())\r
+ return std::make_pair(0, 0);\r
+ std::vector<size_t> buffer_depths;\r
+ std::transform(consumers_.begin(), consumers_.end(), std::back_inserter(buffer_depths), [](const decltype(*consumers_.begin())& pair)\r
+ {\r
+ return pair.second->buffer_depth();\r
+ });\r
+ std::sort(buffer_depths.begin(), buffer_depths.end());\r
+ auto min = buffer_depths.front();\r
+ auto max = buffer_depths.back();\r
+ return std::make_pair(min, max);\r
+ }\r
\r
void execute(const output::source_element_t& element)\r
{ \r
\r
{\r
critical_section::scoped_lock lock(mutex_); \r
-\r
+ \r
if(!has_synchronization_clock() || frame->image_size() != format_desc_.size)\r
{ \r
scoped_oversubcription_token oversubscribe;\r
timer_.tick(1.0/format_desc_.fps);\r
}\r
- \r
+ \r
+ auto minmax = minmax_buffer_depth();\r
+\r
+ frames_.set_capacity(minmax.second - minmax.first + 1);\r
+ frames_.push_back(frame);\r
+\r
+ if(!frames_.full())\r
+ return;\r
+\r
std::vector<int> removables; \r
Concurrency::parallel_for_each(consumers_.begin(), consumers_.end(), [&](const decltype(*consumers_.begin())& pair)\r
{ \r
try\r
{\r
- if(!pair.second->send(frame))\r
+ auto consumer = pair.second;\r
+\r
+ if(!consumer->send(frames_.at(consumer->buffer_depth()-minmax.first)))\r
removables.push_back(pair.first);\r
}\r
catch(...)\r