]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer_device.cpp
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / core / consumer / frame_consumer_device.cpp
1 #include "../StdAfx.h"\r
2 \r
3 #ifdef _MSC_VER\r
4 #pragma warning (disable : 4244)\r
5 #endif\r
6 \r
7 #include "frame_consumer_device.h"\r
8 \r
9 #include "../format/video_format.h"\r
10 #include "../processor/frame.h"\r
11 #include "../processor/frame_processor_device.h"\r
12 \r
13 #include <tbb/concurrent_queue.h>\r
14 #include <tbb/atomic.h>\r
15 \r
16 #include <boost/foreach.hpp>\r
17 #include <boost/thread.hpp>\r
18 \r
19 #include <boost/date_time/posix_time/posix_time.hpp>\r
20 \r
21 #include <boost/range/algorithm_ext/erase.hpp>\r
22 \r
23 namespace caspar { namespace core {\r
24 \r
25 class video_sync_clock\r
26 {\r
27 public:\r
28         video_sync_clock(const video_format_desc& format_desc)\r
29         {\r
30                 period_ = static_cast<long>(format_desc.period*1000000.0);\r
31                 time_ = boost::posix_time::microsec_clock::local_time();\r
32         }\r
33 \r
34         void synchronize()\r
35         {\r
36                 auto remaining = boost::posix_time::microseconds(period_) -     (boost::posix_time::microsec_clock::local_time() - time_);\r
37                 if(remaining > boost::posix_time::microseconds(5000))\r
38                         boost::this_thread::sleep(remaining - boost::posix_time::microseconds(5000));\r
39                 time_ = boost::posix_time::microsec_clock::local_time();\r
40         }\r
41 private:\r
42         boost::posix_time::ptime time_;\r
43         long period_;\r
44 };\r
45 \r
46 struct frame_consumer_device::implementation\r
47 {\r
48 public:\r
49         implementation(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers) \r
50                 : frame_processor_(frame_processor), consumers_(consumers), fmt_(format_desc)\r
51         {\r
52                 if(consumers.empty())\r
53                         BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
54                                 << msg_info("frame_consumer_device requires atleast one consumer."));\r
55 \r
56                 //if(std::any_of(consumers.begin(), consumers.end(), \r
57                 //      [&](const frame_consumer_ptr& pConsumer)\r
58                 //      { return pConsumer->get_video_format_desc() != format_desc;}))\r
59                 //{\r
60                 //      BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
61                 //              << msg_info("All consumers must have same frameformat as frame_consumer_device."));\r
62                 //}\r
63 \r
64                 needs_clock_ = !std::any_of(consumers.begin(), consumers.end(), std::mem_fn(&frame_consumer::has_sync_clock));\r
65                 frame_buffer_.set_capacity(3);\r
66                 is_running_ = true;\r
67                 display_thread_ = boost::thread([=]{run();});\r
68         }\r
69 \r
70         ~implementation()\r
71         {\r
72                 is_running_ = false;\r
73                 display_thread_.join();\r
74         }\r
75                                 \r
76         void run()\r
77         {\r
78                 win32_exception::install_handler();\r
79                                 \r
80                 video_sync_clock clock(fmt_);\r
81                                 \r
82                 while(is_running_)\r
83                 {\r
84                         if(needs_clock_)\r
85                                 clock.synchronize();\r
86                         \r
87                         frame_ptr frame;\r
88                         while((frame == nullptr || frame == frame::empty()) && is_running_)                     \r
89                                 frame_processor_->receive(frame);\r
90                         \r
91                         display_frame(frame);                   \r
92                 }\r
93         }\r
94 \r
95         void display_frame(const frame_ptr& frame)\r
96         {\r
97                 BOOST_FOREACH(const frame_consumer_ptr& consumer, consumers_)\r
98                 {\r
99                         try\r
100                         {\r
101                                 consumer->prepare(frame);\r
102                                 prepared_frames_.push_back(frame);\r
103 \r
104                                 if(prepared_frames_.size() > 2)\r
105                                 {\r
106                                         consumer->display(prepared_frames_.front());\r
107                                         prepared_frames_.pop_front();\r
108                                 }\r
109                         }\r
110                         catch(...)\r
111                         {\r
112                                 CASPAR_LOG_CURRENT_EXCEPTION();\r
113                                 boost::range::remove_erase(consumers_, consumer);\r
114                                 CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
115                                 if(consumers_.empty())\r
116                                 {\r
117                                         CASPAR_LOG(warning) << "No consumers available. Shutting down frame_consumer_device.";\r
118                                         is_running_ = false;\r
119                                 }\r
120                         }\r
121                 }\r
122         }\r
123 \r
124         std::deque<frame_ptr> prepared_frames_;\r
125                 \r
126         boost::thread display_thread_;\r
127 \r
128         tbb::atomic<bool> is_running_;\r
129         tbb::concurrent_bounded_queue<frame_ptr> frame_buffer_;\r
130 \r
131         bool needs_clock_;\r
132         std::vector<frame_consumer_ptr> consumers_;\r
133 \r
134         const video_format_desc& fmt_;\r
135 \r
136         frame_processor_device_ptr frame_processor_;\r
137 };\r
138 \r
139 frame_consumer_device::frame_consumer_device(const frame_processor_device_ptr& frame_processor, const video_format_desc& format_desc, const std::vector<frame_consumer_ptr>& consumers)\r
140         : impl_(new implementation(frame_processor, format_desc, consumers)){}\r
141 }}