]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer_device.cpp
2.0.0.2:
[casparcg] / core / consumer / frame_consumer_device.cpp
1 #include "../StdAfx.h"\r
2 \r
3 #ifdef _MSC_VER\r
4 #pragma warning (disable : 4244)\r
5 #endif\r
6 \r
7 #include "frame_consumer_device.h"\r
8 \r
9 #include "../video_format.h"\r
10 \r
11 #include <common/concurrency/executor.h>\r
12 #include <common/utility/timer.h>\r
13 \r
14 #include <boost/range/algorithm_ext/erase.hpp>\r
15 #include <boost/range/algorithm.hpp>\r
16 \r
17 namespace caspar { namespace core {\r
18         \r
19 struct frame_consumer_device::implementation\r
20 {\r
21         timer clock_;\r
22         executor executor_;     \r
23 \r
24         size_t depth_;\r
25         std::deque<safe_ptr<const read_frame>> buffer_;         \r
26 \r
27         std::vector<safe_ptr<frame_consumer>> consumers_;\r
28         \r
29         const video_format_desc fmt_;\r
30 \r
31 public:\r
32         implementation(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) \r
33                 : consumers_(consumers)\r
34                 , fmt_(format_desc)\r
35         {               \r
36                 if(consumers_.empty())\r
37                         BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("No consumer."));\r
38 \r
39                 std::vector<size_t> depths;\r
40                 boost::range::transform(consumers_, std::back_inserter(depths), std::mem_fn(&frame_consumer::buffer_depth));\r
41                 depth_ = *boost::range::max_element(depths);\r
42                 executor_.set_capacity(3);\r
43                 executor_.start();\r
44         }\r
45                         \r
46         void consume(safe_ptr<const read_frame>&& frame)\r
47         {               \r
48                 executor_.begin_invoke([=]\r
49                 {\r
50                         buffer_.push_back(frame);\r
51 \r
52                         if(buffer_.size() < depth_)\r
53                                 return;\r
54         \r
55                         boost::range::for_each(consumers_, [&](const safe_ptr<frame_consumer>& consumer)\r
56                         {\r
57                                 try\r
58                                 {\r
59                                         consumer->send(buffer_[consumer->buffer_depth()-1]);\r
60                                 }\r
61                                 catch(...)\r
62                                 {\r
63                                         CASPAR_LOG_CURRENT_EXCEPTION();\r
64                                         boost::range::remove_erase(consumers_, consumer);\r
65                                         CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
66                                 }\r
67                         });\r
68 \r
69                         buffer_.pop_front();\r
70         \r
71                         clock_.wait(fmt_.fps);\r
72                 });\r
73         }\r
74 };\r
75 \r
76 frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r
77 frame_consumer_device::frame_consumer_device(const video_format_desc& format_desc, const std::vector<safe_ptr<frame_consumer>>& consumers) : impl_(new implementation(format_desc, consumers)){}\r
78 void frame_consumer_device::consume(safe_ptr<const read_frame>&& future_frame) { impl_->consume(std::move(future_frame)); }\r
79 }}