]> git.sesse.net Git - casparcg/blob - core/consumer/output.cpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / core / consumer / output.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Robert Nagy, ronag89@gmail.com
20 */
21
22 #include "../StdAfx.h"
23
24 #ifdef _MSC_VER
25 #pragma warning (disable : 4244)
26 #endif
27
28 #include "output.h"
29
30 #include "frame_consumer.h"
31 #include "port.h"
32
33 #include "../video_format.h"
34 #include "../frame/frame.h"
35
36 #include <common/assert.h>
37 #include <common/future.h>
38 #include <common/executor.h>
39 #include <common/diagnostics/graph.h>
40 #include <common/prec_timer.h>
41 #include <common/memshfl.h>
42 #include <common/env.h>
43 #include <common/linq.h>
44
45 #include <boost/circular_buffer.hpp>
46 #include <boost/lexical_cast.hpp>
47 #include <boost/property_tree/ptree.hpp>
48 #include <boost/timer.hpp>
49
50 #include <functional>
51
52 namespace caspar { namespace core {
53
54 struct output::impl
55 {               
56         spl::shared_ptr<diagnostics::graph>     graph_;
57         spl::shared_ptr<monitor::subject>       monitor_subject_        = spl::make_shared<monitor::subject>("/output");
58         const int                                                       channel_index_;
59         video_format_desc                                       format_desc_;
60         std::map<int, port>                                     ports_; 
61         prec_timer                                                      sync_timer_;
62         boost::circular_buffer<const_frame>     frames_;
63         executor                                                        executor_                       = { L"output" };
64 public:
65         impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index) 
66                 : graph_(std::move(graph))
67                 , channel_index_(channel_index)
68                 , format_desc_(format_desc)
69         {
70                 graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8));
71         }       
72         
73         void add(int index, spl::shared_ptr<frame_consumer> consumer)
74         {               
75                 remove(index);
76
77                 consumer->initialize(format_desc_, channel_index_);
78                 
79                 executor_.begin_invoke([this, index, consumer]
80                 {                       
81                         port p(index, channel_index_, std::move(consumer));
82                         p.monitor_output().attach_parent(monitor_subject_);
83                         ports_.insert(std::make_pair(index, std::move(p)));
84                 }, task_priority::high_priority);
85         }
86
87         void add(const spl::shared_ptr<frame_consumer>& consumer)
88         {
89                 add(consumer->index(), consumer);
90         }
91
92         void remove(int index)
93         {               
94                 executor_.begin_invoke([=]
95                 {
96                         auto it = ports_.find(index);
97                         if(it != ports_.end())
98                                 ports_.erase(it);                                       
99                 }, task_priority::high_priority);
100         }
101
102         void remove(const spl::shared_ptr<frame_consumer>& consumer)
103         {
104                 remove(consumer->index());
105         }
106         
107         void video_format_desc(const core::video_format_desc& format_desc)
108         {
109                 executor_.invoke([&]
110                 {
111                         if(format_desc_ == format_desc)
112                                 return;
113
114                         auto it = ports_.begin();
115                         while(it != ports_.end())
116                         {                                               
117                                 try
118                                 {
119                                         it->second.video_format_desc(format_desc);
120                                         ++it;
121                                 }
122                                 catch(...)
123                                 {
124                                         CASPAR_LOG_CURRENT_EXCEPTION();
125                                         ports_.erase(it++);
126                                 }
127                         }
128                         
129                         format_desc_ = format_desc;
130                         frames_.clear();
131                 });
132         }
133
134         std::pair<int, int> minmax_buffer_depth() const
135         {               
136                 if(ports_.empty())
137                         return std::make_pair(0, 0);
138
139                 return cpplinq::from(ports_)
140                         .select(values())
141                         .select(std::mem_fn(&port::buffer_depth))
142                         .aggregate(minmax::initial_value<int>(), minmax());
143         }
144
145         bool has_synchronization_clock() const
146         {
147                 return cpplinq::from(ports_)
148                         .select(values())
149                         .where(std::mem_fn(&port::has_synchronization_clock))
150                         .any();
151         }
152                 
153         void operator()(const_frame input_frame, const core::video_format_desc& format_desc)
154         {
155                 boost::timer frame_timer;
156
157                 video_format_desc(format_desc);         
158
159                 executor_.invoke([=]
160                 {                       
161
162                         if(!has_synchronization_clock())
163                                 sync_timer_.tick(1.0/format_desc_.fps);
164                                 
165                         if(input_frame.size() != format_desc_.size)
166                         {
167                                 CASPAR_LOG(debug) << print() << L" Invalid input frame dimension.";
168                                 return;
169                         }
170
171                         auto minmax = minmax_buffer_depth();
172
173                         frames_.set_capacity(std::max(2, minmax.second - minmax.first) + 1); // std::max(2, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back.
174                         frames_.push_back(input_frame);
175
176                         if(!frames_.full())
177                                 return;
178
179                         std::map<int, std::future<bool>> send_results;
180
181                         // Start invocations
182                         for (auto it = ports_.begin(); it != ports_.end();)
183                         {
184                                 auto& port      = it->second;
185                                 auto& frame     = frames_.at(port.buffer_depth()-minmax.first);
186                                         
187                                 try
188                                 {
189                                         send_results.insert(std::make_pair(it->first, port.send(frame)));
190                                         ++it;
191                                 }
192                                 catch (...)
193                                 {
194                                         CASPAR_LOG_CURRENT_EXCEPTION();
195                                         try
196                                         {
197                                                 send_results.insert(std::make_pair(it->first, port.send(frame)));
198                                                 ++it;
199                                         }
200                                         catch(...)
201                                         {
202                                                 CASPAR_LOG_CURRENT_EXCEPTION();
203                                                 CASPAR_LOG(error) << "Failed to recover consumer: " << port.print() << L". Removing it.";
204                                                 it = ports_.erase(it);
205                                         }
206                                 }
207                         }
208
209                         // Retrieve results
210                         for (auto it = send_results.begin(); it != send_results.end(); ++it)
211                         {
212                                 try
213                                 {
214                                         if (!it->second.get())
215                                                 ports_.erase(it->first);
216                                 }
217                                 catch (...)
218                                 {
219                                         CASPAR_LOG_CURRENT_EXCEPTION();
220                                         ports_.erase(it->first);
221                                 }
222                         }
223                 });
224
225                 graph_->set_value("consume-time", frame_timer.elapsed()*format_desc.fps*0.5);
226         }
227
228         std::wstring print() const
229         {
230                 return L"output[" + boost::lexical_cast<std::wstring>(channel_index_) + L"]";
231         }
232
233         std::future<boost::property_tree::wptree> info()
234         {
235                 return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree
236                 {                       
237                         boost::property_tree::wptree info;
238                         for (auto& port : ports_)
239                         {
240                                 info.add_child(L"consumers.consumer", port.second.info())
241                                         .add(L"index", port.first); 
242                         }
243                         return info;
244                 }, task_priority::high_priority));
245         }
246 };
247
248 output::output(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index) : impl_(new impl(std::move(graph), format_desc, channel_index)){}
249 void output::add(int index, const spl::shared_ptr<frame_consumer>& consumer){impl_->add(index, consumer);}
250 void output::add(const spl::shared_ptr<frame_consumer>& consumer){impl_->add(consumer);}
251 void output::remove(int index){impl_->remove(index);}
252 void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
253 std::future<boost::property_tree::wptree> output::info() const{return impl_->info();}
254 void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);}
255 monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
256 }}