2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Robert Nagy, ronag89@gmail.com
22 #include "../stdafx.h"
24 #include "channel_producer.h"
26 #include <core/monitor/monitor.h>
27 #include <core/consumer/frame_consumer.h>
28 #include <core/consumer/output.h>
29 #include <core/producer/frame_producer.h>
30 #include <core/producer/framerate/framerate_producer.h>
31 #include <core/video_channel.h>
33 #include <core/frame/frame.h>
34 #include <core/frame/pixel_format.h>
35 #include <core/frame/audio_channel_layout.h>
36 #include <core/frame/draw_frame.h>
37 #include <core/frame/frame_factory.h>
38 #include <core/video_format.h>
40 #include <boost/thread/once.hpp>
41 #include <boost/lexical_cast.hpp>
42 #include <boost/property_tree/ptree.hpp>
43 #include <boost/range/algorithm/copy.hpp>
45 #include <common/except.h>
46 #include <common/memory.h>
47 #include <common/semaphore.h>
48 #include <common/future.h>
50 #include <tbb/concurrent_queue.h>
53 #pragma warning (push)
54 #pragma warning (disable : 4244)
58 #define __STDC_CONSTANT_MACROS
59 #define __STDC_LIMIT_MACROS
60 #include <libavcodec/avcodec.h>
61 #include <libavformat/avformat.h>
67 #include <modules/ffmpeg/producer/muxer/frame_muxer.h>
68 #include <modules/ffmpeg/producer/util/util.h>
72 namespace caspar { namespace reroute {
74 class channel_consumer : public core::frame_consumer
76 core::monitor::subject monitor_subject_;
77 tbb::concurrent_bounded_queue<core::const_frame> frame_buffer_;
78 core::video_format_desc format_desc_;
79 core::audio_channel_layout channel_layout_ = core::audio_channel_layout::invalid();
82 tbb::atomic<bool> is_running_;
83 tbb::atomic<int64_t> current_age_;
84 semaphore frames_available_ { 0 };
88 channel_consumer(int frames_delay)
89 : consumer_index_(next_consumer_index())
90 , frames_delay_(frames_delay)
94 frame_buffer_.set_capacity(3 + frames_delay);
97 static int next_consumer_index()
99 static tbb::atomic<int> consumer_index_counter;
100 static boost::once_flag consumer_index_counter_initialized;
102 boost::call_once(consumer_index_counter_initialized, [&]()
104 consumer_index_counter = 0;
107 return ++consumer_index_counter;
116 std::future<bool> send(core::const_frame frame) override
118 bool pushed = frame_buffer_.try_push(frame);
121 frames_available_.release();
123 return make_ready_future(is_running_.load());
127 const core::video_format_desc& format_desc,
128 const core::audio_channel_layout& channel_layout,
129 int channel_index) override
131 format_desc_ = format_desc;
132 channel_layout_ = channel_layout;
133 channel_index_ = channel_index;
136 std::wstring name() const override
138 return L"channel-consumer";
141 int64_t presentation_frame_age_millis() const override
146 std::wstring print() const override
148 return L"[channel-consumer|" + boost::lexical_cast<std::wstring>(channel_index_) + L"]";
151 boost::property_tree::wptree info() const override
153 boost::property_tree::wptree info;
154 info.add(L"type", L"channel-consumer");
155 info.add(L"channel-index", channel_index_);
159 bool has_synchronization_clock() const override
164 int buffer_depth() const override
169 int index() const override
171 return 78500 + consumer_index_;
174 core::monitor::subject& monitor_output() override
176 return monitor_subject_;
181 const core::video_format_desc& get_video_format_desc()
186 const core::audio_channel_layout& get_audio_channel_layout()
188 return channel_layout_;
191 void block_until_first_frame_available()
193 if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2)))
195 << print() << L" Timed out while waiting for first frame";
198 core::const_frame receive()
200 core::const_frame frame = core::const_frame::empty();
205 if (frame_buffer_.try_pop(frame))
206 current_age_ = frame.get_age_millis();
217 core::video_format_desc get_progressive_format(core::video_format_desc format_desc)
219 if (format_desc.field_count == 1)
222 format_desc.framerate *= 2;
223 format_desc.fps *= 2.0;
224 format_desc.audio_cadence = core::find_audio_cadence(format_desc.framerate);
225 format_desc.time_scale *= 2;
226 format_desc.field_count = 1;
231 class channel_producer : public core::frame_producer_base
233 core::monitor::subject monitor_subject_;
235 const spl::shared_ptr<core::frame_factory> frame_factory_;
236 const core::video_format_desc output_format_desc_;
237 const spl::shared_ptr<channel_consumer> consumer_;
238 core::constraints pixel_constraints_;
239 ffmpeg::frame_muxer muxer_;
241 std::queue<core::draw_frame> frame_buffer_;
244 explicit channel_producer(
245 const core::frame_producer_dependencies& dependecies,
246 const spl::shared_ptr<core::video_channel>& channel,
248 bool no_auto_deinterlace)
249 : frame_factory_(dependecies.frame_factory)
250 , output_format_desc_(dependecies.format_desc)
251 , consumer_(spl::make_shared<channel_consumer>(frames_delay))
253 channel->video_format_desc().framerate,
254 { ffmpeg::create_input_pad(channel->video_format_desc(), channel->audio_channel_layout().num_channels) },
255 dependecies.frame_factory,
256 no_auto_deinterlace ? channel->video_format_desc() : get_progressive_format(channel->video_format_desc()),
257 channel->audio_channel_layout(),
260 !no_auto_deinterlace)
262 pixel_constraints_.width.set(output_format_desc_.width);
263 pixel_constraints_.height.set(output_format_desc_.height);
264 channel->output().add(consumer_);
265 consumer_->block_until_first_frame_available();
266 CASPAR_LOG(info) << print() << L" Initialized";
272 CASPAR_LOG(info) << print() << L" Uninitialized";
277 core::draw_frame receive_impl() override
279 if (!muxer_.video_ready() || !muxer_.audio_ready())
281 auto read_frame = consumer_->receive();
283 if (read_frame == core::const_frame::empty() || read_frame.image_data().empty())
284 return core::draw_frame::late();
286 auto video_frame = ffmpeg::create_frame();
288 video_frame->data[0] = const_cast<uint8_t*>(read_frame.image_data().begin());
289 video_frame->linesize[0] = static_cast<int>(read_frame.width()) * 4;
290 video_frame->format = AVPixelFormat::AV_PIX_FMT_BGRA;
291 video_frame->width = static_cast<int>(read_frame.width());
292 video_frame->height = static_cast<int>(read_frame.height());
293 video_frame->interlaced_frame = consumer_->get_video_format_desc().field_mode != core::field_mode::progressive;
294 video_frame->top_field_first = consumer_->get_video_format_desc().field_mode == core::field_mode::upper ? 1 : 0;
295 video_frame->key_frame = 1;
297 muxer_.push(video_frame);
300 std::make_shared<core::mutable_audio_buffer>(
301 read_frame.audio_data().begin(),
302 read_frame.audio_data().end())
306 auto frame = muxer_.poll();
308 if (frame == core::draw_frame::empty())
309 return core::draw_frame::late();
314 std::wstring name() const override
316 return L"channel-producer";
319 std::wstring print() const override
321 return L"channel-producer[]";
324 core::constraints& pixel_constraints() override
326 return pixel_constraints_;
329 boost::property_tree::wptree info() const override
331 boost::property_tree::wptree info;
332 info.add(L"type", L"channel-producer");
336 core::monitor::subject& monitor_output() override
338 return monitor_subject_;
341 boost::rational<int> current_framerate() const
343 return muxer_.out_framerate();
347 spl::shared_ptr<core::frame_producer> create_channel_producer(
348 const core::frame_producer_dependencies& dependencies,
349 const spl::shared_ptr<core::video_channel>& channel,
351 bool no_auto_deinterlace)
353 auto producer = spl::make_shared<channel_producer>(dependencies, channel, frames_delay, no_auto_deinterlace);
355 return core::create_framerate_producer(
357 [producer] { return producer->current_framerate(); },
358 dependencies.format_desc.framerate,
359 dependencies.format_desc.field_mode,
360 dependencies.format_desc.audio_cadence);