]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer_device.cpp
f588d83175d62c0a221c2389d8f70cd8b987b599
[casparcg] / core / consumer / frame_consumer_device.cpp
1 #include "../StdAfx.h"\r
2 \r
3 #ifdef _MSC_VER\r
4 #pragma warning (disable : 4244)\r
5 #endif\r
6 \r
7 #include "frame_consumer_device.h"\r
8 \r
9 #include "../format/video_format.h"\r
10 #include "../processor/write_frame.h"\r
11 #include "../processor/frame_processor_device.h"\r
12 \r
13 #include <tbb/concurrent_queue.h>\r
14 #include <tbb/atomic.h>\r
15 \r
16 #include <boost/foreach.hpp>\r
17 #include <boost/thread.hpp>\r
18 \r
19 #include <boost/date_time/posix_time/posix_time.hpp>\r
20 \r
21 #include <boost/range/algorithm_ext/erase.hpp>\r
22 \r
23 namespace caspar { namespace core {\r
24 \r
25 class video_sync_clock\r
26 {\r
27 public:\r
28         video_sync_clock(const video_format_desc& format_desc)\r
29         {\r
30                 period_ = static_cast<long>(format_desc.period*1000000.0);\r
31                 time_ = boost::posix_time::microsec_clock::local_time();\r
32         }\r
33 \r
34         void synchronize()\r
35         {\r
36                 auto remaining = boost::posix_time::microseconds(period_) -     (boost::posix_time::microsec_clock::local_time() - time_);\r
37                 if(remaining > boost::posix_time::microseconds(5000))\r
38                         boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));\r
39                 time_ = boost::posix_time::microsec_clock::local_time();\r
40         }\r
41 private:\r
42         boost::posix_time::ptime time_;\r
43         long period_;\r
44 };\r
45 \r
46 struct frame_consumer_device::implementation\r
47 {\r
48 public:\r
49         implementation(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers) \r
50                 : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)\r
51         {\r
52                 if(consumers.empty())\r
53                         BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
54                                 << msg_info("frame_consumer_device requires atleast one consumer."));\r
55 \r
56                 //if(std::any_of(consumers.begin(), consumers.end(), \r
57                 //      [&](const frame_consumer_ptr& pConsumer)\r
58                 //      { return pConsumer->get_video_format_desc() != format_desc;}))\r
59                 //{\r
60                 //      BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
61                 //              << msg_info("All consumers must have same frameformat as frame_consumer_device."));\r
62                 //}\r
63 \r
64                 needs_clock_ = !std::any_of(consumers.begin(), consumers.end(), std::mem_fn(&frame_consumer::has_sync_clock));\r
65                 is_running_ = true;\r
66                 display_thread_ = boost::thread([=]{run();});\r
67         }\r
68 \r
69         ~implementation()\r
70         {\r
71                 is_running_ = false;\r
72                 display_thread_.join();\r
73         }\r
74                                 \r
75         void run()\r
76         {\r
77                 win32_exception::install_handler();\r
78                                 \r
79                 video_sync_clock clock(fmt_);\r
80                                 \r
81                 while(is_running_)\r
82                 {\r
83                         if(needs_clock_)\r
84                                 clock.synchronize();\r
85                                                 \r
86                         display_frame(frame_processor_->receive());                     \r
87                 }\r
88         }\r
89 \r
90         void display_frame(const consumer_frame& frame)\r
91         {\r
92                 BOOST_FOREACH(const frame_consumer_ptr& consumer, consumers_)\r
93                 {\r
94                         try\r
95                         {\r
96                                 consumer->prepare(frame);\r
97                                 prepared_frames_.push_back(frame);\r
98 \r
99                                 if(prepared_frames_.size() > 2)\r
100                                 {\r
101                                         consumer->display(prepared_frames_.front());\r
102                                         prepared_frames_.pop_front();\r
103                                 }\r
104                         }\r
105                         catch(...)\r
106                         {\r
107                                 try\r
108                                 {\r
109                                         CASPAR_LOG_CURRENT_EXCEPTION();\r
110                                         boost::range::remove_erase(consumers_, consumer);\r
111                                         CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
112                                         if(consumers_.empty())\r
113                                         {\r
114                                                 CASPAR_LOG(warning) << "No consumers available. Shutting down frame_consumer_device.";\r
115                                                 is_running_ = false;\r
116                                         }\r
117                                 }\r
118                                 catch(...){}\r
119                         }\r
120                 }\r
121         }\r
122 \r
123         std::deque<consumer_frame> prepared_frames_;\r
124                 \r
125         boost::thread display_thread_;\r
126 \r
127         tbb::atomic<bool> is_running_;\r
128 \r
129         bool needs_clock_;\r
130         std::vector<frame_consumer_ptr> consumers_;\r
131 \r
132         const video_format_desc& fmt_;\r
133 \r
134         frame_processor_device_ptr frame_processor_;\r
135 };\r
136 \r
137 frame_consumer_device::frame_consumer_device(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers)\r
138         : impl_(new implementation(frame_processor, format_desc, consumers)){}\r
139 }}