1 #include "../StdAfx.h"
\r
4 #pragma warning (disable : 4244)
\r
7 #include "frame_consumer_device.h"
\r
9 #include "../format/video_format.h"
\r
10 #include "../processor/frame.h"
\r
11 #include "../processor/frame_processor_device.h"
\r
13 #include <tbb/concurrent_queue.h>
\r
14 #include <tbb/atomic.h>
\r
16 #include <boost/foreach.hpp>
\r
17 #include <boost/thread.hpp>
\r
19 #include <boost/date_time/posix_time/posix_time.hpp>
\r
21 #include <boost/range/algorithm_ext/erase.hpp>
\r
23 namespace caspar { namespace core {
\r
25 class video_sync_clock
\r
28 video_sync_clock(const video_format_desc& format_desc)
\r
30 period_ = static_cast<long>(format_desc.period*1000000.0);
\r
31 time_ = boost::posix_time::microsec_clock::local_time();
\r
36 auto remaining = boost::posix_time::microseconds(period_) - (boost::posix_time::microsec_clock::local_time() - time_);
\r
37 if(remaining > boost::posix_time::microseconds(5000))
\r
38 boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));
\r
39 time_ = boost::posix_time::microsec_clock::local_time();
\r
42 boost::posix_time::ptime time_;
\r
46 struct frame_consumer_device::implementation
\r
49 implementation(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers)
\r
50 : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)
\r
52 if(consumers.empty())
\r
53 BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer")
\r
54 << msg_info("frame_consumer_device requires atleast one consumer."));
\r
56 //if(std::any_of(consumers.begin(), consumers.end(),
\r
57 // [&](const frame_consumer_ptr& pConsumer)
\r
58 // { return pConsumer->get_video_format_desc() != format_desc;}))
\r
60 // BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer")
\r
61 // << msg_info("All consumers must have same frameformat as frame_consumer_device."));
\r
64 needs_clock_ = !std::any_of(consumers.begin(), consumers.end(), std::mem_fn(&frame_consumer::has_sync_clock));
\r
65 frame_buffer_.set_capacity(3);
\r
67 display_thread_ = boost::thread([=]{run();});
\r
72 is_running_ = false;
\r
73 display_thread_.join();
\r
78 win32_exception::install_handler();
\r
80 video_sync_clock clock(fmt_);
\r
85 clock.synchronize();
\r
88 while((frame == nullptr || frame == frame::empty()) && is_running_)
\r
89 frame_processor_->receive(frame);
\r
91 display_frame(frame);
\r
95 void display_frame(const frame_ptr& frame)
\r
97 BOOST_FOREACH(const frame_consumer_ptr& consumer, consumers_)
\r
101 consumer->prepare(frame);
\r
102 prepared_frames_.push_back(frame);
\r
104 if(prepared_frames_.size() > 2)
\r
106 consumer->display(prepared_frames_.front());
\r
107 prepared_frames_.pop_front();
\r
112 CASPAR_LOG_CURRENT_EXCEPTION();
\r
113 boost::range::remove_erase(consumers_, consumer);
\r
114 CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";
\r
115 if(consumers_.empty())
\r
117 CASPAR_LOG(warning) << "No consumers available. Shutting down frame_consumer_device.";
\r
118 is_running_ = false;
\r
124 std::deque<frame_ptr> prepared_frames_;
\r
126 boost::thread display_thread_;
\r
128 tbb::atomic<bool> is_running_;
\r
129 tbb::concurrent_bounded_queue<frame_ptr> frame_buffer_;
\r
132 std::vector<frame_consumer_ptr> consumers_;
\r
134 const video_format_desc& fmt_;
\r
136 frame_processor_device_ptr frame_processor_;
\r
139 frame_consumer_device::frame_consumer_device(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers)
\r
140 : impl_(new implementation(frame_processor, format_desc, consumers)){}
\r