2 * Copyright 2013 Sveriges Television AB http://casparcg.com/
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Robert Nagy, ronag89@gmail.com
22 #include "../stdafx.h"
24 #include "channel_producer.h"
26 #include <core/monitor/monitor.h>
27 #include <core/consumer/frame_consumer.h>
28 #include <core/consumer/output.h>
29 #include <core/producer/frame_producer.h>
30 #include <core/producer/framerate/framerate_producer.h>
31 #include <core/video_channel.h>
33 #include <core/frame/frame.h>
34 #include <core/frame/pixel_format.h>
35 #include <core/frame/audio_channel_layout.h>
36 #include <core/frame/draw_frame.h>
37 #include <core/frame/frame_factory.h>
38 #include <core/video_format.h>
40 #include <boost/thread/once.hpp>
41 #include <boost/lexical_cast.hpp>
42 #include <boost/property_tree/ptree.hpp>
43 #include <boost/range/algorithm/copy.hpp>
45 #include <common/except.h>
46 #include <common/memory.h>
47 #include <common/memcpy.h>
48 #include <common/semaphore.h>
49 #include <common/future.h>
51 #include <tbb/concurrent_queue.h>
54 #pragma warning (push)
55 #pragma warning (disable : 4244)
59 #define __STDC_CONSTANT_MACROS
60 #define __STDC_LIMIT_MACROS
61 #include <libavcodec/avcodec.h>
62 #include <libavformat/avformat.h>
68 #include <modules/ffmpeg/producer/muxer/frame_muxer.h>
69 #include <modules/ffmpeg/producer/util/util.h>
75 namespace caspar { namespace reroute {
77 class channel_consumer : public core::frame_consumer
79 core::monitor::subject monitor_subject_;
80 tbb::concurrent_bounded_queue<core::const_frame> frame_buffer_;
81 core::video_format_desc format_desc_;
82 core::audio_channel_layout channel_layout_ = core::audio_channel_layout::invalid();
85 tbb::atomic<bool> is_running_;
86 tbb::atomic<int64_t> current_age_;
87 semaphore frames_available_ { 0 };
91 channel_consumer(int frames_delay)
92 : consumer_index_(next_consumer_index())
93 , frames_delay_(frames_delay)
97 frame_buffer_.set_capacity(3 + frames_delay);
100 static int next_consumer_index()
102 static tbb::atomic<int> consumer_index_counter;
103 static boost::once_flag consumer_index_counter_initialized;
105 boost::call_once(consumer_index_counter_initialized, [&]()
107 consumer_index_counter = 0;
110 return ++consumer_index_counter;
119 std::future<bool> send(core::const_frame frame) override
121 bool pushed = frame_buffer_.try_push(frame);
124 frames_available_.release();
126 return make_ready_future(is_running_.load());
130 const core::video_format_desc& format_desc,
131 const core::audio_channel_layout& channel_layout,
132 int channel_index) override
134 format_desc_ = format_desc;
135 channel_layout_ = channel_layout;
136 channel_index_ = channel_index;
139 std::wstring name() const override
141 return L"channel-consumer";
144 int64_t presentation_frame_age_millis() const override
149 std::wstring print() const override
151 return L"[channel-consumer|" + boost::lexical_cast<std::wstring>(channel_index_) + L"]";
154 boost::property_tree::wptree info() const override
156 boost::property_tree::wptree info;
157 info.add(L"type", L"channel-consumer");
158 info.add(L"channel-index", channel_index_);
162 bool has_synchronization_clock() const override
167 int buffer_depth() const override
172 int index() const override
174 return 78500 + consumer_index_;
177 core::monitor::subject& monitor_output() override
179 return monitor_subject_;
184 const core::video_format_desc& get_video_format_desc()
189 const core::audio_channel_layout& get_audio_channel_layout()
191 return channel_layout_;
194 void block_until_first_frame_available()
196 if (!frames_available_.try_acquire(1 + frames_delay_, boost::chrono::seconds(2)))
198 << print() << L" Timed out while waiting for first frame";
201 core::const_frame receive()
203 core::const_frame frame = core::const_frame::empty();
208 if (frame_buffer_.try_pop(frame))
209 current_age_ = frame.get_age_millis();
220 core::video_format_desc get_progressive_format(core::video_format_desc format_desc)
222 if (format_desc.field_count == 1)
225 format_desc.framerate *= 2;
226 format_desc.fps *= 2.0;
227 format_desc.audio_cadence = core::find_audio_cadence(format_desc.framerate);
228 format_desc.time_scale *= 2;
229 format_desc.field_count = 1;
234 class channel_producer : public core::frame_producer_base
236 core::monitor::subject monitor_subject_;
238 const spl::shared_ptr<core::frame_factory> frame_factory_;
239 const core::video_format_desc output_format_desc_;
240 const spl::shared_ptr<channel_consumer> consumer_;
241 core::constraints pixel_constraints_;
242 ffmpeg::frame_muxer muxer_;
244 std::queue<core::draw_frame> frame_buffer_;
247 explicit channel_producer(const core::frame_producer_dependencies& dependecies, const spl::shared_ptr<core::video_channel>& channel, int frames_delay)
248 : frame_factory_(dependecies.frame_factory)
249 , output_format_desc_(dependecies.format_desc)
250 , consumer_(spl::make_shared<channel_consumer>(frames_delay))
252 channel->video_format_desc().framerate,
253 { ffmpeg::create_input_pad(channel->video_format_desc(), channel->audio_channel_layout().num_channels) },
254 dependecies.frame_factory,
255 get_progressive_format(channel->video_format_desc()),
256 channel->audio_channel_layout(),
261 pixel_constraints_.width.set(output_format_desc_.width);
262 pixel_constraints_.height.set(output_format_desc_.height);
263 channel->output().add(consumer_);
264 consumer_->block_until_first_frame_available();
265 CASPAR_LOG(info) << print() << L" Initialized";
271 CASPAR_LOG(info) << print() << L" Uninitialized";
276 core::draw_frame receive_impl() override
278 if (!muxer_.video_ready() || !muxer_.audio_ready())
280 auto read_frame = consumer_->receive();
282 if (read_frame == core::const_frame::empty() || read_frame.image_data().empty())
283 return core::draw_frame::late();
285 auto video_frame = ffmpeg::create_frame();
287 video_frame->data[0] = const_cast<uint8_t*>(read_frame.image_data().begin());
288 video_frame->linesize[0] = static_cast<int>(read_frame.width()) * 4;
289 video_frame->format = AVPixelFormat::AV_PIX_FMT_BGRA;
290 video_frame->width = static_cast<int>(read_frame.width());
291 video_frame->height = static_cast<int>(read_frame.height());
292 video_frame->interlaced_frame = consumer_->get_video_format_desc().field_mode != core::field_mode::progressive;
293 video_frame->top_field_first = consumer_->get_video_format_desc().field_mode == core::field_mode::upper ? 1 : 0;
294 video_frame->key_frame = 1;
296 muxer_.push(video_frame);
299 std::make_shared<core::mutable_audio_buffer>(
300 read_frame.audio_data().begin(),
301 read_frame.audio_data().end())
305 auto frame = muxer_.poll();
307 if (frame == core::draw_frame::empty())
308 return core::draw_frame::late();
313 std::wstring name() const override
315 return L"channel-producer";
318 std::wstring print() const override
320 return L"channel-producer[]";
323 core::constraints& pixel_constraints() override
325 return pixel_constraints_;
328 boost::property_tree::wptree info() const override
330 boost::property_tree::wptree info;
331 info.add(L"type", L"channel-producer");
335 core::monitor::subject& monitor_output() override
337 return monitor_subject_;
340 boost::rational<int> current_framerate() const
342 return muxer_.out_framerate();
346 spl::shared_ptr<core::frame_producer> create_channel_producer(
347 const core::frame_producer_dependencies& dependencies,
348 const spl::shared_ptr<core::video_channel>& channel,
351 auto producer = spl::make_shared<channel_producer>(dependencies, channel, frames_delay);
353 return core::create_framerate_producer(
355 [producer] { return producer->current_framerate(); },
356 dependencies.format_desc.framerate,
357 dependencies.format_desc.field_mode,
358 dependencies.format_desc.audio_cadence);