]> git.sesse.net Git - casparcg/blob - core/producer/frame_producer_device.cpp
2.0.0.2: Fixed proper destruction order of executor(threads) to avoid access violatio...
[casparcg] / core / producer / frame_producer_device.cpp
1 #include "../StdAfx.h"\r
2 \r
3 #include "frame_producer_device.h"\r
4 \r
5 #include "../mixer/frame/draw_frame.h"\r
6 #include "../mixer/frame_factory.h"\r
7 \r
8 #include "layer.h"\r
9 \r
10 #include <common/concurrency/executor.h>\r
11 \r
12 #include <boost/range/algorithm_ext/erase.hpp>\r
13 #include <boost/lexical_cast.hpp>\r
14 \r
15 #include <tbb/parallel_for.h>\r
16 #include <tbb/spin_mutex.h>\r
17 \r
18 #include <array>\r
19 #include <memory>\r
20 \r
21 namespace caspar { namespace core {\r
22 \r
23 struct frame_producer_device::implementation : boost::noncopyable\r
24 {               \r
25         std::array<layer, frame_producer_device::MAX_LAYER> layers_;            \r
26 \r
27         tbb::spin_mutex output_mutex_;\r
28         output_func output_;\r
29 \r
30         const safe_ptr<frame_factory> factory_;\r
31         \r
32         mutable executor executor_;\r
33 public:\r
34         implementation(const safe_ptr<frame_factory>& factory, const output_func& output)  \r
35                 : factory_(factory)\r
36                 , output_(output)\r
37         {\r
38                 for(size_t n = 0; n < layers_.size(); ++n)\r
39                         layers_[n] = layer(n);\r
40 \r
41                 executor_.start();\r
42                 executor_.begin_invoke([=]{tick();});\r
43         }\r
44 \r
45         ~implementation()\r
46         {\r
47                 CASPAR_LOG(info) << "Shutting down producer-device.";\r
48         }\r
49                                         \r
50         void tick()\r
51         {               \r
52                 output_func output;\r
53                 {\r
54                         tbb::spin_mutex::scoped_lock lock(output_mutex_);\r
55                         output = output_;\r
56                 }\r
57                 output(draw());\r
58                 executor_.begin_invoke([=]{tick();});\r
59         }\r
60         \r
61         std::vector<safe_ptr<draw_frame>> draw()\r
62         {       \r
63                 std::vector<safe_ptr<draw_frame>> frames(layers_.size(), draw_frame::empty());\r
64                 tbb::parallel_for(tbb::blocked_range<size_t>(0, frames.size(), 1), // Use grain-size = 1.\r
65                 [&](const tbb::blocked_range<size_t>& r)\r
66                 {\r
67                         for(size_t i = r.begin(); i != r.end(); ++i)\r
68                                 frames[i] = layers_[i].receive();\r
69                 });             \r
70                 boost::range::remove_erase(frames, draw_frame::eof());\r
71                 boost::range::remove_erase(frames, draw_frame::empty());\r
72                 return frames;\r
73         }\r
74 \r
75         void load(size_t index, const safe_ptr<frame_producer>& producer, bool play_on_load)\r
76         {\r
77                 check_bounds(index);\r
78                 producer->initialize(factory_);\r
79                 executor_.invoke([&]\r
80                 {\r
81                         layers_[index].load(producer, play_on_load);\r
82                 });\r
83         }\r
84                         \r
85         void preview(size_t index, const safe_ptr<frame_producer>& producer)\r
86         {\r
87                 check_bounds(index);\r
88                 producer->initialize(factory_);\r
89                 executor_.invoke([&]\r
90                 {                       \r
91                         layers_[index].preview(producer);\r
92                 });\r
93         }\r
94 \r
95         void pause(size_t index)\r
96         {               \r
97                 check_bounds(index);\r
98                 executor_.invoke([&]\r
99                 {\r
100                         layers_[index].pause();\r
101                 });\r
102         }\r
103 \r
104         void play(size_t index)\r
105         {               \r
106                 check_bounds(index);\r
107                 executor_.invoke([&]\r
108                 {\r
109                         layers_[index].play();\r
110                 });\r
111         }\r
112 \r
113         void stop(size_t index)\r
114         {               \r
115                 check_bounds(index);\r
116                 executor_.invoke([&]\r
117                 {\r
118                         layers_[index].stop();\r
119                 });\r
120         }\r
121 \r
122         void clear(size_t index)\r
123         {\r
124                 check_bounds(index);\r
125                 executor_.invoke([&]\r
126                 {\r
127                         layers_[index] = std::move(layer());\r
128                 });\r
129         }\r
130                 \r
131         void clear()\r
132         {\r
133                 executor_.invoke([&]\r
134                 {\r
135                         for(auto it = layers_.begin(); it != layers_.end(); ++it)\r
136                                 *it = std::move(layer());\r
137                 });\r
138         }       \r
139         \r
140         void swap_layer(size_t index, size_t other_index)\r
141         {\r
142                 check_bounds(index);\r
143                 check_bounds(other_index);\r
144                 executor_.invoke([&]\r
145                 {\r
146                         layers_[index].swap(layers_[other_index]);\r
147                 });\r
148         }\r
149 \r
150         void swap_layer(size_t index, size_t other_index, frame_producer_device& other)\r
151         {\r
152                 check_bounds(index);\r
153                 check_bounds(other_index);\r
154                 executor_.invoke([&]\r
155                 {\r
156                         layers_[index].swap(other.impl_->layers_[other_index]);\r
157                 });\r
158         }\r
159 \r
160         void swap_output(frame_producer_device& other)\r
161         {\r
162                 tbb::spin_mutex::scoped_lock lock1(output_mutex_);\r
163                 tbb::spin_mutex::scoped_lock lock2(other.impl_->output_mutex_);\r
164                 output_.swap(other.impl_->output_);\r
165         }\r
166 \r
167         void check_bounds(size_t index) const\r
168         {\r
169                 if(index < 0 || index >= frame_producer_device::MAX_LAYER)\r
170                         BOOST_THROW_EXCEPTION(out_of_range() << arg_name_info("index") << arg_value_info(boost::lexical_cast<std::string>(index)));\r
171         }\r
172         \r
173         boost::unique_future<safe_ptr<frame_producer>> foreground(size_t index) const\r
174         {\r
175                 check_bounds(index);\r
176                 return executor_.begin_invoke([=]() -> safe_ptr<frame_producer>\r
177                 {                       \r
178                         return layers_[index].foreground();\r
179                 });\r
180         }\r
181 };\r
182 \r
183 frame_producer_device::frame_producer_device(const safe_ptr<frame_factory>& factory, const output_func& output) : impl_(new implementation(factory, output)){}\r
184 frame_producer_device::frame_producer_device(frame_producer_device&& other) : impl_(std::move(other.impl_)){}\r
185 void frame_producer_device::load(size_t index, const safe_ptr<frame_producer>& producer, bool play_on_load){impl_->load(index, producer, play_on_load);}\r
186 void frame_producer_device::preview(size_t index, const safe_ptr<frame_producer>& producer){impl_->preview(index, producer);}\r
187 void frame_producer_device::pause(size_t index){impl_->pause(index);}\r
188 void frame_producer_device::play(size_t index){impl_->play(index);}\r
189 void frame_producer_device::stop(size_t index){impl_->stop(index);}\r
190 void frame_producer_device::clear(size_t index){impl_->clear(index);}\r
191 void frame_producer_device::clear(){impl_->clear();}\r
192 void frame_producer_device::swap_layer(size_t index, size_t other_index){impl_->swap_layer(index, other_index);}\r
193 void frame_producer_device::swap_layer(size_t index, size_t other_index, frame_producer_device& other){impl_->swap_layer(index, other_index, other);}\r
194 void frame_producer_device::swap_output(frame_producer_device& other){impl_->swap_output(other);}\r
195 boost::unique_future<safe_ptr<frame_producer>> frame_producer_device::foreground(size_t index) const{   return impl_->foreground(index);}\r
196 }}