]> git.sesse.net Git - casparcg/blob - modules/reroute/producer/reroute_producer.cpp
127285c74f167a7a31ab6b15ace2643fa2f23c1b
[casparcg] / modules / reroute / producer / reroute_producer.cpp
1 /*\r
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
3 *\r
4 * This file is part of CasparCG (www.casparcg.com).\r
5 *\r
6 * CasparCG is free software: you can redistribute it and/or modify\r
7 * it under the terms of the GNU General Public License as published by\r
8 * the Free Software Foundation, either version 3 of the License, or\r
9 * (at your option) any later version.\r
10 *\r
11 * CasparCG is distributed in the hope that it will be useful,\r
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
14 * GNU General Public License for more details.\r
15 *\r
16 * You should have received a copy of the GNU General Public License\r
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
18 *\r
19 * Author: Robert Nagy, ronag89@gmail.com\r
20 */\r
21 \r
22 #include "../stdafx.h"\r
23 \r
24 #include "reroute_producer.h"\r
25 \r
26 #include <core/producer/frame_producer.h>\r
27 #include <core/frame/draw_frame.h>\r
28 #include <core/frame/frame_factory.h>\r
29 #include <core/frame/pixel_format.h>\r
30 #include <core/frame/frame.h>\r
31 #include <core/video_channel.h>\r
32 #include <core/producer/stage.h>\r
33 \r
34 #include <common/except.h>\r
35 #include <common/diagnostics/graph.h>\r
36 #include <common/log.h>\r
37 #include <common/reactive.h>\r
38 \r
39 #include <asmlib.h>\r
40 \r
41 #include <tbb/concurrent_queue.h>\r
42 \r
43 #include <boost/property_tree/ptree.hpp>\r
44 #include <boost/foreach.hpp>\r
45 #include <boost/optional.hpp>\r
46 #include <boost/range/algorithm_ext/push_back.hpp>\r
47 #include <boost/range/numeric.hpp>\r
48 #include <boost/range/adaptor/map.hpp>\r
49 \r
50 #include <queue>\r
51 \r
52 namespace caspar { namespace reroute {\r
53                 \r
54 class reroute_producer : public reactive::observer<std::map<int, core::draw_frame>>\r
55                                            , public core::frame_producer_base\r
56 {\r
57         const spl::shared_ptr<diagnostics::graph>                                               graph_;\r
58         \r
59         tbb::concurrent_bounded_queue<std::map<int, core::draw_frame>>  input_buffer_;\r
60 public:\r
61         explicit reroute_producer() \r
62         {\r
63                 graph_->set_color("late-frame", diagnostics::color(0.6f, 0.3f, 0.3f));\r
64                 graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));\r
65                 graph_->set_text(print());\r
66                 diagnostics::register_graph(graph_);\r
67 \r
68                 input_buffer_.set_capacity(1);\r
69         }\r
70                 \r
71         // observable\r
72 \r
73         void on_next(const std::map<int, core::draw_frame>& frames)\r
74         {\r
75                 if(!input_buffer_.try_push(frames))\r
76                         graph_->set_tag("dropped-frame");               \r
77         }\r
78 \r
79         // frame_producer\r
80                         \r
81         core::draw_frame receive_impl() override\r
82         {               \r
83                 std::map<int, core::draw_frame> frames;\r
84                 if(!input_buffer_.try_pop(frames))\r
85                 {\r
86                         graph_->set_tag("late-frame");\r
87                         return core::draw_frame::late();                \r
88                 }\r
89 \r
90                 return boost::accumulate(frames | boost::adaptors::map_values, core::draw_frame::empty(), core::draw_frame::over);\r
91         }       \r
92                 \r
93         std::wstring print() const override\r
94         {\r
95                 return L"reroute[]";\r
96         }\r
97 \r
98         std::wstring name() const override\r
99         {\r
100                 return L"reroute";\r
101         }\r
102 \r
103         boost::property_tree::wptree info() const override\r
104         {\r
105                 boost::property_tree::wptree info;\r
106                 info.add(L"type", L"rerotue-producer");\r
107                 return info;\r
108         }\r
109                 \r
110         void subscribe(const monitor::observable::observer_ptr& o) override\r
111         {\r
112         }\r
113 \r
114         void unsubscribe(const monitor::observable::observer_ptr& o) override\r
115         {\r
116         }\r
117 };\r
118 \r
119 spl::shared_ptr<core::frame_producer> create_producer(core::video_channel& channel)\r
120 {\r
121         auto producer = spl::make_shared<reroute_producer>();\r
122         \r
123         std::weak_ptr<reactive::observer<std::map<int, core::draw_frame>>> o = producer;\r
124 \r
125         channel.stage().subscribe(o);\r
126 \r
127         return producer;\r
128 }\r
129 \r
130 }}