]> git.sesse.net Git - casparcg/blobdiff - core/consumer/frame_consumer_device.cpp
2.0.0.2:
[casparcg] / core / consumer / frame_consumer_device.cpp
index 5c0552d99f40d59e1b3486ccdfdb43dcf3d6f9cd..9056a79a59fca478547f9fdea7830f665c53e5c0 100644 (file)
@@ -6,16 +6,11 @@
 \r
 #include "frame_consumer_device.h"\r
 \r
-#include "../format/video_format.h"\r
+#include "../video_format.h"\r
 \r
 #include <common/concurrency/executor.h>\r
 #include <common/utility/timer.h>\r
 \r
-#include <tbb/concurrent_queue.h>\r
-#include <tbb/atomic.h>\r
-\r
-#include <boost/date_time/posix_time/posix_time.hpp>\r
-\r
 #include <boost/range/algorithm_ext/erase.hpp>\r
 #include <boost/range/algorithm.hpp>\r
 \r
@@ -23,70 +18,59 @@ namespace caspar { namespace core {
        \r
 struct frame_consumer_device::implementation\r
 {\r
+       timer clock_;\r
+       executor executor_;     \r
+\r
+       size_t depth_;\r
+       std::deque<safe_ptr<const read_frame>> buffer_;         \r
+\r
+       std::vector<safe_ptr<frame_consumer>> consumers_;\r
+       \r
+       const video_format_desc fmt_;\r
+\r
 public:\r
-       implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) : consumers_(consumers), fmt_(format_desc)\r
+       implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) \r
+               : consumers_(consumers)\r
+               , fmt_(format_desc)\r
        {               \r
+               if(consumers_.empty())\r
+                       BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("No consumer."));\r
+\r
                std::vector<size_t> depths;\r
                boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));\r
-               max_depth_ = *boost::range::max_element(depths);\r
+               depth_ = *boost::range::max_element(depths);\r
+               executor_.set_capacity(3);\r
                executor_.start();\r
        }\r
-                                       \r
-       void tick(const safe_ptr<const read_frame>& frame)\r
-       {\r
-               buffer_.push_back(frame);\r
-       \r
-               boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
-               {\r
-                       size_t offset = max_depth_ - consumer->buffer_depth();\r
-                       if(offset < buffer_.size())\r
-                               consumer->send(*(buffer_.begin() + offset));\r
-               });\r
                        \r
-               frame_consumer::sync_mode sync = frame_consumer::ready;\r
-               boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
+       void consume(safe_ptr<const read_frame>&& frame)\r
+       {               \r
+               executor_.begin_invoke([=]\r
                {\r
-                       try\r
-                       {\r
-                               size_t offset = max_depth_ - consumer->buffer_depth();\r
-                               if(offset >= buffer_.size())\r
-                                       return;\r
+                       buffer_.push_back(frame);\r
 \r
-                               if(consumer->synchronize() == frame_consumer::clock)\r
-                                       sync = frame_consumer::clock;\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
-                               boost::range::remove_erase(consumers_, consumer);\r
-                               CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
-                       }\r
-               });\r
+                       if(buffer_.size() < depth_)\r
+                               return;\r
        \r
-               if(sync != frame_consumer::clock)\r
-                       clock_.wait();\r
+                       boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
+                       {\r
+                               try\r
+                               {\r
+                                       consumer->send(buffer_[consumer->buffer_depth()-1]);\r
+                               }\r
+                               catch(...)\r
+                               {\r
+                                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+                                       boost::range::remove_erase(consumers_, consumer);\r
+                                       CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
+                               }\r
+                       });\r
 \r
-               if(buffer_.size() >= max_depth_)\r
                        buffer_.pop_front();\r
-       }\r
-\r
-       void consume(safe_ptr<const read_frame>&& frame)\r
-       {               \r
-               if(executor_.size() < 3)\r
-                       executor_.begin_invoke([=]{tick(frame);});\r
-               else\r
-                       executor_.invoke([=]{tick(frame);});\r
-       }\r
-\r
-       timer clock_;\r
-       executor executor_;     \r
-\r
-       size_t max_depth_;\r
-       std::deque<safe_ptr<const read_frame>> buffer_;         \r
-\r
-       std::vector<safe_ptr<frame_consumer>> consumers_;\r
        \r
-       const video_format_desc& fmt_;\r
+                       clock_.wait(fmt_.fps);\r
+               });\r
+       }\r
 };\r
 \r
 frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r