]> git.sesse.net Git - casparcg/blobdiff - core/consumer/frame_consumer_device.cpp
2.0.0.2:
[casparcg] / core / consumer / frame_consumer_device.cpp
index 34622fb99228f120823b06f3179e4c216cb33bc5..9056a79a59fca478547f9fdea7830f665c53e5c0 100644 (file)
 \r
 #include "frame_consumer_device.h"\r
 \r
-#include "../format/video_format.h"\r
-#include "../processor/write_frame.h"\r
-#include "../processor/frame_processor_device.h"\r
+#include "../video_format.h"\r
 \r
-#include <tbb/concurrent_queue.h>\r
-#include <tbb/atomic.h>\r
-\r
-#include <boost/foreach.hpp>\r
-#include <boost/thread.hpp>\r
-\r
-#include <boost/date_time/posix_time/posix_time.hpp>\r
+#include <common/concurrency/executor.h>\r
+#include <common/utility/timer.h>\r
 \r
 #include <boost/range/algorithm_ext/erase.hpp>\r
 #include <boost/range/algorithm.hpp>\r
 \r
 namespace caspar { namespace core {\r
-\r
-class clock_sync\r
+       \r
+struct frame_consumer_device::implementation\r
 {\r
-public:\r
-       clock_sync() : time_(boost::posix_time::microsec_clock::local_time()){}\r
+       timer clock_;\r
+       executor executor_;     \r
 \r
-       void wait(double period)\r
-       {                               \r
-               auto remaining = boost::posix_time::microseconds(static_cast<long>(period*1000000.0)) - (boost::posix_time::microsec_clock::local_time() - time_);\r
-               if(remaining > boost::posix_time::microseconds(5000))\r
-                       boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));\r
-       }\r
-private:\r
-       boost::posix_time::ptime time_;\r
-};\r
+       size_t depth_;\r
+       std::deque<safe_ptr<const read_frame>> buffer_;         \r
+\r
+       std::vector<safe_ptr<frame_consumer>> consumers_;\r
+       \r
+       const video_format_desc fmt_;\r
 \r
-struct frame_consumer_device::implementation\r
-{\r
 public:\r
-       implementation(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers) \r
-               : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)\r
+       implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) \r
+               : consumers_(consumers)\r
+               , fmt_(format_desc)\r
        {               \r
+               if(consumers_.empty())\r
+                       BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("No consumer."));\r
+\r
                std::vector<size_t> depths;\r
                boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));\r
-               max_depth_ = *boost::range::max_element(depths);\r
-               display_thread_ = boost::thread([=]{run();});\r
-       }\r
-\r
-       ~implementation()\r
-       {\r
-               is_running_ = false;\r
-               display_thread_.join();\r
-       }\r
-                               \r
-       void run()\r
-       {\r
-               is_running_ = true;\r
-               CASPAR_LOG(warning) << "Starting frame_consumer_device.";               \r
-\r
-               win32_exception::install_handler();\r
-                                                               \r
-               while(is_running_)                                              \r
-               {\r
-                       display_frame(frame_processor_->receive());             \r
-                       is_running_ = !consumers_.empty();\r
-               }\r
-               CASPAR_LOG(warning) << "Shutting down frame_consumer_device.";                  \r
+               depth_ = *boost::range::max_element(depths);\r
+               executor_.set_capacity(3);\r
+               executor_.start();\r
        }\r
-\r
-       void display_frame(const consumer_frame& frame)\r
-       {               \r
-               buffer_.push_back(frame);\r
-\r
-               clock_sync clock;\r
-               \r
-               boost::range::for_each(consumers_, [&](const frame_consumer_ptr& consumer)\r
-               {\r
-                       size_t offset = max_depth_ - consumer->buffer_depth();\r
-                       if(offset < buffer_.size())\r
-                               consumer->send(*(buffer_.begin() + offset));\r
-               });\r
                        \r
-               frame_consumer::sync_mode sync = frame_consumer::ready;\r
-               boost::range::for_each(consumers_, [&](const frame_consumer_ptr& consumer)\r
+       void consume(safe_ptr<const read_frame>&& frame)\r
+       {               \r
+               executor_.begin_invoke([=]\r
                {\r
-                       try\r
-                       {\r
-                               size_t offset = max_depth_ - consumer->buffer_depth();\r
-                               if(offset >= buffer_.size())\r
-                                       return;\r
+                       buffer_.push_back(frame);\r
 \r
-                               if(consumer->synchronize() == frame_consumer::clock)\r
-                                       sync = frame_consumer::clock;\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
-                               boost::range::remove_erase(consumers_, consumer);\r
-                               CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
-                       }\r
-               });\r
+                       if(buffer_.size() < depth_)\r
+                               return;\r
        \r
-               if(sync != frame_consumer::clock)\r
-                       clock.wait(fmt_.period);\r
+                       boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
+                       {\r
+                               try\r
+                               {\r
+                                       consumer->send(buffer_[consumer->buffer_depth()-1]);\r
+                               }\r
+                               catch(...)\r
+                               {\r
+                                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+                                       boost::range::remove_erase(consumers_, consumer);\r
+                                       CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
+                               }\r
+                       });\r
 \r
-               if(buffer_.size() >= max_depth_)\r
                        buffer_.pop_front();\r
-       }\r
-\r
-       size_t max_depth_;\r
-       std::deque<consumer_frame> buffer_;\r
-               \r
-       tbb::atomic<bool> is_running_;\r
-       boost::thread display_thread_;\r
-\r
-       std::vector<frame_consumer_ptr> consumers_;\r
        \r
-       frame_processor_device_ptr frame_processor_;\r
-\r
-       const video_format_desc& fmt_;\r
+                       clock_.wait(fmt_.fps);\r
+               });\r
+       }\r
 };\r
 \r
-frame_consumer_device::frame_consumer_device(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers)\r
-       : impl_(new implementation(frame_processor, format_desc, consumers)){}\r
+frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r
+frame_consumer_device::frame_consumer_device(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) : impl_(new implementation(format_desc, consumers)){}\r
+void frame_consumer_device::consume(safe_ptr<const read_frame>&& future_frame) { impl_->consume(std::move(future_frame)); }\r
 }}
\ No newline at end of file