\r
#include "frame_consumer_device.h"\r
\r
-#include "../video/video_format.h"\r
-#include "../processor/frame.h"\r
-#include "../processor/frame_processor_device.h"\r
+#include "../video_format.h"\r
\r
-#include <tbb/concurrent_queue.h>\r
-#include <tbb/atomic.h>\r
-\r
-#include <boost/foreach.hpp>\r
-#include <boost/thread.hpp>\r
-\r
-#include <boost/date_time/posix_time/posix_time.hpp>\r
+#include <common/concurrency/executor.h>\r
+#include <common/utility/timer.h>\r
\r
#include <boost/range/algorithm_ext/erase.hpp>\r
+#include <boost/range/algorithm.hpp>\r
\r
namespace caspar { namespace core {\r
-\r
-class video_sync_clock\r
-{\r
-public:\r
- video_sync_clock(const video_format_desc& format_desc)\r
- {\r
- period_ = static_cast<long>(render_video_format_period(format_desc)*1000000.0);\r
- time_ = boost::posix_time::microsec_clock::local_time();\r
- }\r
-\r
- void synchronize()\r
- {\r
- auto remaining = boost::posix_time::microseconds(period_) - (boost::posix_time::microsec_clock::local_time() - time_);\r
- if(remaining > boost::posix_time::microseconds(5000))\r
- boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));\r
- time_ = boost::posix_time::microsec_clock::local_time();\r
- }\r
-private:\r
- boost::posix_time::ptime time_;\r
- long period_;\r
-};\r
-\r
+ \r
struct frame_consumer_device::implementation\r
{\r
-public:\r
- implementation(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers) \r
- : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)\r
- {\r
- if(consumers.empty())\r
- BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
- << msg_info("frame_consumer_device requires atleast one consumer."));\r
+ timer clock_;\r
+ executor executor_; \r
\r
- //if(std::any_of(consumers.begin(), consumers.end(), \r
- // [&](const frame_consumer_ptr& pConsumer)\r
- // { return pConsumer->get_video_format_desc() != format_desc;}))\r
- //{\r
- // BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
- // << msg_info("All consumers must have same frameformat as frame_consumer_device."));\r
- //}\r
+ size_t depth_;\r
+ std::deque<safe_ptr<const read_frame>> buffer_; \r
\r
- needs_clock_ = !std::any_of(consumers.begin(), consumers.end(), std::mem_fn(&frame_consumer::has_sync_clock));\r
- frame_buffer_.set_capacity(3);\r
- is_running_ = true;\r
- display_thread_ = boost::thread([=]{run();});\r
- }\r
+ std::vector<safe_ptr<frame_consumer>> consumers_;\r
+ \r
+ const video_format_desc fmt_;\r
\r
- ~implementation()\r
- {\r
- is_running_ = false;\r
- display_thread_.join();\r
+public:\r
+ implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) \r
+ : consumers_(consumers)\r
+ , fmt_(format_desc)\r
+ { \r
+ if(consumers_.empty())\r
+ BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("No consumer."));\r
+\r
+ std::vector<size_t> depths;\r
+ boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));\r
+ depth_ = *boost::range::max_element(depths);\r
+ executor_.set_capacity(3);\r
+ executor_.start();\r
}\r
- \r
- void run()\r
- {\r
- CASPAR_LOG(info) << L"Started frame_consumer_device thread.";\r
- win32_exception::install_handler();\r
- \r
- video_sync_clock clock(fmt_);\r
- \r
- while(is_running_)\r
- {\r
- if(needs_clock_)\r
- clock.synchronize();\r
\r
- frame_ptr frame;\r
- while(frame == nullptr && !frame_processor_->try_receive(frame))\r
- {\r
- if(frame != nullptr)\r
- CASPAR_LOG(trace) << "Display Buffer Underrun.";\r
- frame_processor_->receive(frame);\r
- } \r
- display_frame(frame); \r
- }\r
- \r
- CASPAR_LOG(info) << L"Ended frame_consumer_device thread.";\r
- }\r
-\r
- void display_frame(const frame_ptr& frame)\r
- {\r
- BOOST_FOREACH(const frame_consumer_ptr& consumer, consumers_)\r
+ void consume(safe_ptr<const read_frame>&& frame)\r
+ { \r
+ executor_.begin_invoke([=]\r
{\r
- try\r
- {\r
- consumer->prepare(frame);\r
- prepared_frames_.push_back(frame);\r
+ buffer_.push_back(frame);\r
\r
- if(prepared_frames_.size() > 2)\r
+ if(buffer_.size() < depth_)\r
+ return;\r
+ \r
+ boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
+ {\r
+ try\r
{\r
- consumer->display(prepared_frames_.front());\r
- prepared_frames_.pop_front();\r
+ consumer->send(buffer_[consumer->buffer_depth()-1]);\r
}\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- boost::range::remove_erase(consumers_, consumer);\r
- CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
- if(consumers_.empty())\r
+ catch(...)\r
{\r
- CASPAR_LOG(warning) << "No consumers available. Shutting down frame_consumer_device.";\r
- is_running_ = false;\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ boost::range::remove_erase(consumers_, consumer);\r
+ CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
}\r
- }\r
- }\r
- }\r
-\r
- std::deque<frame_ptr> prepared_frames_;\r
- \r
- boost::thread display_thread_;\r
-\r
- tbb::atomic<bool> is_running_;\r
- tbb::concurrent_bounded_queue<frame_ptr> frame_buffer_;\r
+ });\r
\r
- bool needs_clock_;\r
- std::vector<frame_consumer_ptr> consumers_;\r
-\r
- video_format_desc fmt_;\r
-\r
- frame_processor_device_ptr frame_processor_;\r
+ buffer_.pop_front();\r
+ \r
+ clock_.wait(fmt_.fps);\r
+ });\r
+ }\r
};\r
\r
-frame_consumer_device::frame_consumer_device(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers)\r
- : impl_(new implementation(frame_processor, format_desc, consumers)){}\r
+frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r
+frame_consumer_device::frame_consumer_device(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) : impl_(new implementation(format_desc, consumers)){}\r
+void frame_consumer_device::consume(safe_ptr<const read_frame>&& future_frame) { impl_->consume(std::move(future_frame)); }\r
}}
\ No newline at end of file