]> git.sesse.net Git - casparcg/blobdiff - core/consumer/frame_consumer_device.cpp
2.0.0.2:
[casparcg] / core / consumer / frame_consumer_device.cpp
index f588d83175d62c0a221c2389d8f70cd8b987b599..9056a79a59fca478547f9fdea7830f665c53e5c0 100644 (file)
 \r
 #include "frame_consumer_device.h"\r
 \r
-#include "../format/video_format.h"\r
-#include "../processor/write_frame.h"\r
-#include "../processor/frame_processor_device.h"\r
+#include "../video_format.h"\r
 \r
-#include <tbb/concurrent_queue.h>\r
-#include <tbb/atomic.h>\r
-\r
-#include <boost/foreach.hpp>\r
-#include <boost/thread.hpp>\r
-\r
-#include <boost/date_time/posix_time/posix_time.hpp>\r
+#include <common/concurrency/executor.h>\r
+#include <common/utility/timer.h>\r
 \r
 #include <boost/range/algorithm_ext/erase.hpp>\r
+#include <boost/range/algorithm.hpp>\r
 \r
 namespace caspar { namespace core {\r
-\r
-class video_sync_clock\r
-{\r
-public:\r
-       video_sync_clock(const video_format_desc& format_desc)\r
-       {\r
-               period_ = static_cast<long>(format_desc.period*1000000.0);\r
-               time_ = boost::posix_time::microsec_clock::local_time();\r
-       }\r
-\r
-       void synchronize()\r
-       {\r
-               auto remaining = boost::posix_time::microseconds(period_) -     (boost::posix_time::microsec_clock::local_time() - time_);\r
-               if(remaining > boost::posix_time::microseconds(5000))\r
-                       boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));\r
-               time_ = boost::posix_time::microsec_clock::local_time();\r
-       }\r
-private:\r
-       boost::posix_time::ptime time_;\r
-       long period_;\r
-};\r
-\r
+       \r
 struct frame_consumer_device::implementation\r
 {\r
-public:\r
-       implementation(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers) \r
-               : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)\r
-       {\r
-               if(consumers.empty())\r
-                       BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
-                               << msg_info("frame_consumer_device requires atleast one consumer."));\r
+       timer clock_;\r
+       executor executor_;     \r
 \r
-               //if(std::any_of(consumers.begin(), consumers.end(), \r
-               //      [&](const frame_consumer_ptr& pConsumer)\r
-               //      { return pConsumer->get_video_format_desc() != format_desc;}))\r
-               //{\r
-               //      BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
-               //              << msg_info("All consumers must have same frameformat as frame_consumer_device."));\r
-               //}\r
+       size_t depth_;\r
+       std::deque<safe_ptr<const read_frame>> buffer_;         \r
 \r
-               needs_clock_ = !std::any_of(consumers.begin(), consumers.end(), std::mem_fn(&frame_consumer::has_sync_clock));\r
-               is_running_ = true;\r
-               display_thread_ = boost::thread([=]{run();});\r
-       }\r
+       std::vector<safe_ptr<frame_consumer>> consumers_;\r
+       \r
+       const video_format_desc fmt_;\r
 \r
-       ~implementation()\r
-       {\r
-               is_running_ = false;\r
-               display_thread_.join();\r
+public:\r
+       implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) \r
+               : consumers_(consumers)\r
+               , fmt_(format_desc)\r
+       {               \r
+               if(consumers_.empty())\r
+                       BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("No consumer."));\r
+\r
+               std::vector<size_t> depths;\r
+               boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));\r
+               depth_ = *boost::range::max_element(depths);\r
+               executor_.set_capacity(3);\r
+               executor_.start();\r
        }\r
-                               \r
-       void run()\r
-       {\r
-               win32_exception::install_handler();\r
-                               \r
-               video_sync_clock clock(fmt_);\r
-                               \r
-               while(is_running_)\r
+                       \r
+       void consume(safe_ptr<const read_frame>&& frame)\r
+       {               \r
+               executor_.begin_invoke([=]\r
                {\r
-                       if(needs_clock_)\r
-                               clock.synchronize();\r
-                                               \r
-                       display_frame(frame_processor_->receive());                     \r
-               }\r
-       }\r
+                       buffer_.push_back(frame);\r
 \r
-       void display_frame(const consumer_frame& frame)\r
-       {\r
-               BOOST_FOREACH(const frame_consumer_ptr& consumer, consumers_)\r
-               {\r
-                       try\r
+                       if(buffer_.size() < depth_)\r
+                               return;\r
+       \r
+                       boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
                        {\r
-                               consumer->prepare(frame);\r
-                               prepared_frames_.push_back(frame);\r
-\r
-                               if(prepared_frames_.size() > 2)\r
+                               try\r
                                {\r
-                                       consumer->display(prepared_frames_.front());\r
-                                       prepared_frames_.pop_front();\r
+                                       consumer->send(buffer_[consumer->buffer_depth()-1]);\r
                                }\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               try\r
+                               catch(...)\r
                                {\r
                                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                                        boost::range::remove_erase(consumers_, consumer);\r
                                        CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
-                                       if(consumers_.empty())\r
-                                       {\r
-                                               CASPAR_LOG(warning) << "No consumers available. Shutting down frame_consumer_device.";\r
-                                               is_running_ = false;\r
-                                       }\r
                                }\r
-                               catch(...){}\r
-                       }\r
-               }\r
-       }\r
-\r
-       std::deque<consumer_frame> prepared_frames_;\r
-               \r
-       boost::thread display_thread_;\r
-\r
-       tbb::atomic<bool> is_running_;\r
+                       });\r
 \r
-       bool needs_clock_;\r
-       std::vector<frame_consumer_ptr> consumers_;\r
-\r
-       const video_format_desc& fmt_;\r
-\r
-       frame_processor_device_ptr frame_processor_;\r
+                       buffer_.pop_front();\r
+       \r
+                       clock_.wait(fmt_.fps);\r
+               });\r
+       }\r
 };\r
 \r
-frame_consumer_device::frame_consumer_device(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers)\r
-       : impl_(new implementation(frame_processor, format_desc, consumers)){}\r
+frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r
+frame_consumer_device::frame_consumer_device(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) : impl_(new implementation(format_desc, consumers)){}\r
+void frame_consumer_device::consume(safe_ptr<const read_frame>&& future_frame) { impl_->consume(std::move(future_frame)); }\r
 }}
\ No newline at end of file