1 #include "../StdAfx.h"
\r
4 #pragma warning (disable : 4244)
\r
7 #include "frame_consumer_device.h"
\r
9 #include "../format/video_format.h"
\r
10 #include "../processor/write_frame.h"
\r
11 #include "../processor/frame_processor_device.h"
\r
12 #include "../../common/concurrency/executor.h"
\r
14 #include <tbb/concurrent_queue.h>
\r
15 #include <tbb/atomic.h>
\r
17 #include <boost/date_time/posix_time/posix_time.hpp>
\r
19 #include <boost/range/algorithm_ext/erase.hpp>
\r
20 #include <boost/range/algorithm.hpp>
\r
22 namespace caspar { namespace core {
\r
27 clock_sync() : time_(boost::posix_time::microsec_clock::local_time()){}
\r
29 void wait(double period)
\r
31 auto remaining = boost::posix_time::microseconds(static_cast<long>(period*1000000.0)) - (boost::posix_time::microsec_clock::local_time() - time_);
\r
32 if(remaining > boost::posix_time::microseconds(5000))
\r
33 boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));
\r
36 boost::posix_time::ptime time_;
\r
39 struct frame_consumer_device::implementation
\r
42 implementation(const safe_ptr<frame_processor_device>& frame_processor, const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers)
\r
43 : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)
\r
45 std::vector<size_t> depths;
\r
46 boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));
\r
47 max_depth_ = *boost::range::max_element(depths);
\r
49 executor_.begin_invoke([=]{tick();});
\r
54 process(frame_processor_->receive());
\r
55 if(!consumers_.empty())
\r
56 executor_.begin_invoke([=]{tick();});
\r
59 void process(const safe_ptr<const read_frame>& frame)
\r
61 buffer_.push_back(frame);
\r
65 boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)
\r
67 size_t offset = max_depth_ - consumer->buffer_depth();
\r
68 if(offset < buffer_.size())
\r
69 consumer->send(*(buffer_.begin() + offset));
\r
72 frame_consumer::sync_mode sync = frame_consumer::ready;
\r
73 boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)
\r
77 size_t offset = max_depth_ - consumer->buffer_depth();
\r
78 if(offset >= buffer_.size())
\r
81 if(consumer->synchronize() == frame_consumer::clock)
\r
82 sync = frame_consumer::clock;
\r
86 CASPAR_LOG_CURRENT_EXCEPTION();
\r
87 boost::range::remove_erase(consumers_, consumer);
\r
88 CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";
\r
92 if(sync != frame_consumer::clock)
\r
93 clock.wait(fmt_.period);
\r
95 if(buffer_.size() >= max_depth_)
\r
96 buffer_.pop_front();
\r
99 executor executor_;
\r
102 std::deque<safe_ptr<const read_frame>> buffer_;
\r
104 std::vector<safe_ptr<frame_consumer>> consumers_;
\r
106 safe_ptr<frame_processor_device> frame_processor_;
\r
108 const video_format_desc& fmt_;
\r
111 frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}
\r
112 frame_consumer_device::frame_consumer_device(const safe_ptr<frame_processor_device>& frame_processor, const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers)
\r
113 : impl_(new implementation(frame_processor, format_desc, consumers)){}
\r