]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer_device.cpp
2.0.0.2
[casparcg] / core / consumer / frame_consumer_device.cpp
1 #include "../StdAfx.h"\r
2 \r
3 #ifdef _MSC_VER\r
4 #pragma warning (disable : 4244)\r
5 #endif\r
6 \r
7 #include "frame_consumer_device.h"\r
8 \r
9 #include "../format/video_format.h"\r
10 #include "../processor/write_frame.h"\r
11 #include "../processor/frame_processor_device.h"\r
12 #include "../../common/concurrency/executor.h"\r
13 \r
14 #include <tbb/concurrent_queue.h>\r
15 #include <tbb/atomic.h>\r
16 \r
17 #include <boost/date_time/posix_time/posix_time.hpp>\r
18 \r
19 #include <boost/range/algorithm_ext/erase.hpp>\r
20 #include <boost/range/algorithm.hpp>\r
21 \r
22 namespace caspar { namespace core {\r
23 \r
24 class clock_sync\r
25 {\r
26 public:\r
27         clock_sync() : time_(boost::posix_time::microsec_clock::local_time()){}\r
28 \r
29         void wait(double period)\r
30         {                               \r
31                 auto remaining = boost::posix_time::microseconds(static_cast<long>(period*1000000.0)) - (boost::posix_time::microsec_clock::local_time() - time_);\r
32                 if(remaining > boost::posix_time::microseconds(5000))\r
33                         boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));\r
34         }\r
35 private:\r
36         boost::posix_time::ptime time_;\r
37 };\r
38 \r
39 struct frame_consumer_device::implementation\r
40 {\r
41 public:\r
42         implementation(const safe_ptr<frame_processor_device>& frame_processor, const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) \r
43                 : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)\r
44         {               \r
45                 std::vector<size_t> depths;\r
46                 boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));\r
47                 max_depth_ = *boost::range::max_element(depths);\r
48                 executor_.start();\r
49                 executor_.begin_invoke([=]{tick();});\r
50         }\r
51                                         \r
52         void tick()\r
53         {\r
54                 process(frame_processor_->receive());           \r
55                 if(!consumers_.empty())\r
56                         executor_.begin_invoke([=]{tick();});\r
57         }\r
58 \r
59         void process(const safe_ptr<const read_frame>& frame)\r
60         {               \r
61                 buffer_.push_back(frame);\r
62 \r
63                 clock_sync clock;\r
64                 \r
65                 boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
66                 {\r
67                         size_t offset = max_depth_ - consumer->buffer_depth();\r
68                         if(offset < buffer_.size())\r
69                                 consumer->send(*(buffer_.begin() + offset));\r
70                 });\r
71                         \r
72                 frame_consumer::sync_mode sync = frame_consumer::ready;\r
73                 boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
74                 {\r
75                         try\r
76                         {\r
77                                 size_t offset = max_depth_ - consumer->buffer_depth();\r
78                                 if(offset >= buffer_.size())\r
79                                         return;\r
80 \r
81                                 if(consumer->synchronize() == frame_consumer::clock)\r
82                                         sync = frame_consumer::clock;\r
83                         }\r
84                         catch(...)\r
85                         {\r
86                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
87                                 boost::range::remove_erase(consumers_, consumer);\r
88                                 CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
89                         }\r
90                 });\r
91         \r
92                 if(sync != frame_consumer::clock)\r
93                         clock.wait(fmt_.period);\r
94 \r
95                 if(buffer_.size() >= max_depth_)\r
96                         buffer_.pop_front();\r
97         }\r
98 \r
99         executor executor_;     \r
100 \r
101         size_t max_depth_;\r
102         std::deque<safe_ptr<const read_frame>> buffer_;         \r
103 \r
104         std::vector<safe_ptr<frame_consumer>> consumers_;\r
105         \r
106         safe_ptr<frame_processor_device> frame_processor_;\r
107 \r
108         const video_format_desc& fmt_;\r
109 };\r
110 \r
111 frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r
112 frame_consumer_device::frame_consumer_device(const safe_ptr<frame_processor_device>& frame_processor, const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers)\r
113         : impl_(new implementation(frame_processor, format_desc, consumers)){}\r
114 }}