}\r
\r
#pragma warning(pop)\r
-\r
-#undef Yield
\ No newline at end of file
#include "../../stdafx.h"\r
\r
#include "audio_decoder.h"\r
-\r
#include "audio_resampler.h"\r
\r
#include "../util.h"\r
#include "../../ffmpeg_error.h"\r
\r
+#include <common/concurrency/governor.h>\r
+#include <common/exception/win32_exception.h>\r
#include <core/video_format.h>\r
\r
#include <tbb/cache_aligned_allocator.h>\r
\r
-#include <queue>\r
-\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (disable : 4244)\r
#pragma warning (pop)\r
#endif\r
\r
+#undef Yield\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct audio_decoder::implementation : boost::noncopyable\r
+struct audio_decoder::implementation : public agent, boost::noncopyable\r
{ \r
- audio_decoder::source_t& source_;\r
int index_;\r
- const safe_ptr<AVCodecContext> codec_context_; \r
- const core::video_format_desc format_desc_;\r
+ std::shared_ptr<AVCodecContext> codec_context_; \r
+ \r
audio_resampler resampler_;\r
-\r
+ \r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- std::queue<safe_ptr<AVPacket>> packets_;\r
+ unbounded_buffer<audio_decoder::source_element_t> source_;\r
+ ITarget<audio_decoder::target_element_t>& target_;\r
+\r
+ governor governor_;\r
+ tbb::atomic<bool> is_running_;\r
+ \r
public:\r
- explicit implementation(audio_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) \r
- : source_(source)\r
- , codec_context_(open_codec(*context, AVMEDIA_TYPE_AUDIO, index_))\r
- , format_desc_(format_desc) \r
+ explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
+ : codec_context_(open_codec(context, AVMEDIA_TYPE_AUDIO, index_))\r
+ , resampler_(format_desc.audio_channels, codec_context_->channels,\r
+ format_desc.audio_sample_rate, codec_context_->sample_rate,\r
+ AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
- , resampler_(format_desc_.audio_channels, codec_context_->channels,\r
- format_desc_.audio_sample_rate, codec_context_->sample_rate,\r
- AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
- { \r
- }\r
- \r
- std::shared_ptr<core::audio_buffer> poll()\r
- {\r
- auto packet = create_packet();\r
- \r
- if(packets_.empty())\r
- {\r
- if(!try_receive(source_, packet) || packet->stream_index != index_)\r
- return nullptr;\r
- else\r
- packets_.push(packet);\r
- }\r
- \r
- packet = packets_.front();\r
- \r
- std::shared_ptr<core::audio_buffer> audio;\r
- if(packet == loop_packet()) \r
- { \r
- avcodec_flush_buffers(codec_context_.get()); \r
- audio = loop_audio();\r
- } \r
- else\r
- audio = decode(*packet);\r
+ , source_([this](const audio_decoder::source_element_t& packet){return packet->stream_index == index_;})\r
+ , target_(target)\r
+ , governor_(2)\r
+ { \r
+ CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
+\r
+ source.link_target(&source_);\r
\r
- if(packet->size == 0) \r
- packets_.pop();\r
+ is_running_ = true;\r
+ start();\r
+ }\r
\r
- return audio;\r
+ ~implementation()\r
+ {\r
+ is_running_ = false;\r
+ governor_.cancel();\r
+ agent::wait(this);\r
}\r
- \r
- std::shared_ptr<core::audio_buffer> decode(AVPacket& pkt)\r
- { \r
- buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
- int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
+\r
+ virtual void run()\r
+ {\r
+ win32_exception::install_handler();\r
+\r
+ try\r
+ {\r
+ while(is_running_)\r
+ { \r
+ auto ticket = governor_.acquire();\r
+ auto packet = receive(source_);\r
+ \r
+ if(packet == loop_packet(index_))\r
+ {\r
+ avcodec_flush_buffers(codec_context_.get());\r
+ send(target_, loop_audio());\r
+ continue;\r
+ }\r
+\r
+ if(packet == eof_packet(index_))\r
+ break;\r
+\r
+ auto result = std::make_shared<core::audio_buffer>();\r
+ \r
+ while(packet->size > 0)\r
+ {\r
+ buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
+ int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
\r
- int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, &pkt), "[audio_decoder]");\r
+ int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
\r
- // There might be several frames in one packet.\r
- pkt.size -= ret;\r
- pkt.data += ret;\r
+ // There might be several frames in one packet.\r
+ packet->size -= ret;\r
+ packet->data += ret;\r
\r
- buffer1_.resize(written_bytes);\r
+ buffer1_.resize(written_bytes);\r
\r
- buffer1_ = resampler_.resample(std::move(buffer1_));\r
+ buffer1_ = resampler_.resample(std::move(buffer1_));\r
\r
- const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
- const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
+ const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
+ const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
+\r
+ auto audio = make_safe<core::audio_buffer>(samples, samples + n_samples);\r
+\r
+ send(target_, safe_ptr<core::audio_buffer>(audio.get(), [audio, ticket](core::audio_buffer*){}));\r
+ Context::Yield();\r
+ }\r
+ }\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
\r
- return std::make_shared<core::audio_buffer>(samples, samples + n_samples);\r
+ send(target_, eof_audio());\r
+\r
+ done();\r
}\r
};\r
\r
-audio_decoder::audio_decoder(audio_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) : impl_(new implementation(source, context, format_desc)){}\r
-std::shared_ptr<core::audio_buffer> audio_decoder::poll(){return impl_->poll();}\r
+audio_decoder::audio_decoder(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc)\r
+ : impl_(new implementation(source, target, context, format_desc))\r
+{\r
+}\r
+\r
+int64_t audio_decoder::nb_frames() const\r
+{\r
+ return 0;\r
+}\r
\r
}}
\ No newline at end of file
*/\r
#pragma once\r
\r
+#include "../util.h"\r
+\r
#include <core/mixer/audio/audio_mixer.h>\r
\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
#include <vector>\r
\r
struct AVPacket;\r
\r
}\r
\r
-#include <agents.h>\r
-\r
namespace ffmpeg {\r
\r
class audio_decoder : boost::noncopyable\r
{\r
public:\r
- typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
\r
- explicit audio_decoder(source_t& source, const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
+ typedef safe_ptr<AVPacket> source_element_t;\r
+ typedef safe_ptr<core::audio_buffer> target_element_t;\r
+\r
+ typedef Concurrency::ISource<source_element_t>& source_t;\r
+ typedef Concurrency::ITarget<target_element_t>& target_t;\r
\r
- std::shared_ptr<core::audio_buffer> poll();\r
+ explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
\r
+ int64_t nb_frames() const;\r
+\r
private:\r
+ \r
struct implementation;\r
safe_ptr<implementation> impl_;\r
};\r
{ \r
std::shared_ptr<ReSampleContext> resampler_;\r
\r
- std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> copy_buffer_;\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer2_;\r
\r
const size_t output_channels_;\r
L" audio_channels:" << input_channels <<\r
L" sample_fmt:" << input_sample_format;\r
\r
- if(resampler)\r
- resampler_.reset(resampler, audio_resample_close);\r
- else\r
- BOOST_THROW_EXCEPTION(caspar_exception());\r
+ CASPAR_VERIFY(resampler, caspar_exception());\r
+\r
+ resampler_.reset(resampler, audio_resample_close);\r
} \r
}\r
\r
#include "util.h"\r
#include "audio/audio_decoder.h"\r
#include "video/video_decoder.h"\r
+#include "../ffmpeg_error.h"\r
\r
#include <common/env.h>\r
#include <common/utility/assert.h>\r
#include <boost/range/algorithm/find_if.hpp>\r
#include <boost/range/algorithm/find.hpp>\r
\r
-#include <ppl.h>\r
-\r
#include <agents.h>\r
\r
+#include <iterator>\r
+#include <vector>\r
+#include <string>\r
+\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
- \r
+ \r
struct ffmpeg_producer : public core::frame_producer\r
-{\r
- const safe_ptr<diagnostics::graph> graph_;\r
-\r
- const std::wstring filename_;\r
+{ \r
+ const std::wstring filename_;\r
+ const int start_;\r
+ const bool loop_;\r
+ const size_t length_;\r
\r
- boost::timer frame_timer_;\r
- boost::timer video_timer_;\r
- boost::timer audio_timer_;\r
+ call<input::target_element_t> throw_away_;\r
+ unbounded_buffer<input::target_element_t> packets_;\r
+ std::shared_ptr<unbounded_buffer<frame_muxer2::video_source_element_t>> video_;\r
+ std::shared_ptr<unbounded_buffer<frame_muxer2::audio_source_element_t>> audio_;\r
+ unbounded_buffer<frame_muxer2::target_element_t> frames_;\r
+ \r
+ const safe_ptr<diagnostics::graph> graph_;\r
\r
- const safe_ptr<core::frame_factory> frame_factory_;\r
- const core::video_format_desc format_desc_;\r
- \r
- unbounded_buffer<safe_ptr<AVPacket>> packets_;\r
- unbounded_buffer<safe_ptr<AVPacket>> video_packets_;\r
- unbounded_buffer<safe_ptr<AVPacket>> audio_packets_;\r
- unbounded_buffer<safe_ptr<core::basic_frame>> frames_;\r
-\r
- input input_; \r
- video_decoder video_decoder_;\r
- audio_decoder audio_decoder_; \r
- double fps_;\r
- frame_muxer muxer_;\r
-\r
- const int start_;\r
- const bool loop_;\r
- const size_t length_;\r
-\r
- safe_ptr<core::basic_frame> last_frame_;\r
-\r
- const size_t width_;\r
- const size_t height_;\r
- bool is_progressive_;\r
+ input input_; \r
+ std::unique_ptr<video_decoder> video_decoder_;\r
+ std::unique_ptr<audio_decoder> audio_decoder_; \r
+ std::unique_ptr<frame_muxer2> muxer_;\r
\r
+ safe_ptr<core::basic_frame> last_frame_;\r
\r
public:\r
explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
: filename_(filename)\r
- , frame_factory_(frame_factory) \r
- , format_desc_(frame_factory->get_video_format_desc())\r
- , input_(packets_, graph_, filename_, loop, start, length)\r
- , video_decoder_(video_packets_, input_.context(), frame_factory)\r
- , audio_decoder_(audio_packets_, input_.context(), frame_factory->get_video_format_desc())\r
- , fps_(video_decoder_.fps())\r
- , muxer_(frames_, fps_, frame_factory, filter)\r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
+ , throw_away_([](const input::target_element_t&){})\r
+ , input_(packets_, graph_, filename_, loop, start, length)\r
, last_frame_(core::basic_frame::empty())\r
- , width_(video_decoder_.width())\r
- , height_(video_decoder_.height())\r
- , is_progressive_(true)\r
- {\r
- graph_->add_guide("frame-time", 0.5);\r
- graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));\r
+ { \r
+ try\r
+ {\r
+ auto video = std::make_shared<unbounded_buffer<frame_muxer2::video_source_element_t>>();\r
+ video_decoder_.reset(new video_decoder(packets_, *video, *input_.context()));\r
+ video_ = video;\r
+ }\r
+ catch(averror_stream_not_found&)\r
+ {\r
+ CASPAR_LOG(warning) << "No video-stream found. Running without video."; \r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ CASPAR_LOG(warning) << "Failed to open video-stream. Running without video."; \r
+ }\r
+\r
+ try\r
+ {\r
+ auto audio = std::make_shared<unbounded_buffer<frame_muxer2::audio_source_element_t>>();\r
+ audio_decoder_.reset(new audio_decoder(packets_, *audio, *input_.context(), frame_factory->get_video_format_desc()));\r
+ audio_ = audio;\r
+ }\r
+ catch(averror_stream_not_found&)\r
+ {\r
+ CASPAR_LOG(warning) << "No audio-stream found. Running without video."; \r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ CASPAR_LOG(warning) << "Failed to open audio-stream. Running without audio."; \r
+ } \r
+\r
+ CASPAR_VERIFY(video_decoder_ || audio_decoder_, ffmpeg_error());\r
+ \r
+ packets_.link_target(&throw_away_);\r
+ muxer_.reset(new frame_muxer2(video_.get(), audio_.get(), frames_, video_decoder_ ? video_decoder_->fps() : frame_factory->get_video_format_desc().fps, frame_factory));\r
+ \r
graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f)); \r
+ graph_->set_text(print());\r
diagnostics::register_graph(graph_);\r
+\r
+ input_.start();\r
}\r
\r
~ffmpeg_producer()\r
{\r
- input_.stop();\r
+ input_.stop(); \r
}\r
- \r
+ \r
virtual safe_ptr<core::basic_frame> receive(int hints)\r
{\r
auto frame = core::basic_frame::late();\r
\r
- frame_timer_.restart();\r
- \r
- for(int n = 0; n < 64 && !try_receive(frames_, frame); ++n)\r
- decode_frame(hints);\r
- \r
- graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
-\r
- if(frame != core::basic_frame::late())\r
- last_frame_ = frame; \r
- else\r
- {\r
- if(input_.eof())\r
- return core::basic_frame::eof();\r
- else \r
- graph_->add_tag("underflow"); \r
+ try\r
+ { \r
+ frame = last_frame_ = Concurrency::receive(frames_, 10).first;\r
+ graph_->set_text(narrow(print()));\r
+ }\r
+ catch(operation_timed_out&)\r
+ { \r
+ graph_->add_tag("underflow"); \r
}\r
\r
- graph_->set_text(narrow(print()));\r
- \r
return frame;\r
}\r
\r
return disable_audio(last_frame_);\r
}\r
\r
- void decode_frame(int hints)\r
- {\r
- if(!muxer_.need_video())\r
- {\r
- std::shared_ptr<AVFrame> video;\r
- while(!video)\r
- {\r
- auto pkt = create_packet();\r
- if(try_receive(packets_, pkt))\r
- {\r
- send(video_packets_, pkt);\r
- send(audio_packets_, pkt);\r
- }\r
- video = video_decoder_.poll();\r
- Context::Yield();\r
- }\r
- is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_;\r
- muxer_.push(make_safe_ptr(video), hints); \r
- }\r
-\r
- Context::Yield();\r
- \r
- if(!muxer_.need_audio())\r
- {\r
- std::shared_ptr<core::audio_buffer> audio;\r
- while(!audio)\r
- {\r
- auto pkt = create_packet();\r
- if(try_receive(packets_, pkt))\r
- {\r
- send(video_packets_, pkt);\r
- send(audio_packets_, pkt);\r
- }\r
- audio = audio_decoder_.poll();\r
- Context::Yield();\r
- }\r
- muxer_.push(make_safe_ptr(audio)); \r
- }\r
- }\r
-\r
- virtual int64_t nb_frames() const \r
+ virtual int64_t nb_frames() const\r
{\r
if(loop_)\r
return std::numeric_limits<int64_t>::max();\r
int64_t nb_frames = input_.nb_frames();\r
if(input_.nb_loops() < 1) // input still hasn't counted all frames\r
{\r
- int64_t video_nb_frames = video_decoder_.nb_frames();\r
+ int64_t video_nb_frames = video_decoder_->nb_frames();\r
+ int64_t audio_nb_frames = audio_decoder_->nb_frames();\r
\r
- nb_frames = std::min(static_cast<int64_t>(length_), std::max(nb_frames, video_nb_frames));\r
+ nb_frames = std::min(static_cast<int64_t>(length_), std::max(nb_frames, std::max(video_nb_frames, audio_nb_frames)));\r
}\r
\r
- nb_frames = muxer_.calc_nb_frames(nb_frames);\r
+ nb_frames = muxer_->calc_nb_frames(nb_frames);\r
+ \r
+ return nb_frames - start_;\r
+ }\r
\r
- // TODO: Might need to scale nb_frames av frame_muxer transformations.\r
+ virtual void param(const std::wstring& param)\r
+ {\r
+ typedef std::istream_iterator<std::wstring, wchar_t, std::char_traits<wchar_t>> wistream_iterator;\r
+ std::wstringstream str(param);\r
+ std::vector<std::wstring> params;\r
+ std::copy(wistream_iterator(str), wistream_iterator(), std::back_inserter(params));\r
\r
- return nb_frames - start_;\r
+ if(boost::iequals(params.at(0), L"LOOP"))\r
+ input_.loop(boost::lexical_cast<bool>(params.at(1)));\r
}\r
\r
virtual std::wstring print() const\r
{\r
- return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
- + boost::lexical_cast<std::wstring>(width_) + L"x" + boost::lexical_cast<std::wstring>(height_)\r
- + (is_progressive_ ? L"p" : L"i") + boost::lexical_cast<std::wstring>(is_progressive_ ? fps_ : 2.0 * fps_)\r
- + L"]";\r
+ if(video_decoder_)\r
+ {\r
+ return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
+ + boost::lexical_cast<std::wstring>(video_decoder_->width()) + L"x" + boost::lexical_cast<std::wstring>(video_decoder_->height())\r
+ + (video_decoder_->is_progressive() ? L"p" : L"i") + boost::lexical_cast<std::wstring>(video_decoder_->is_progressive() ? video_decoder_->fps() : 2.0 * video_decoder_->fps())\r
+ + L"]";\r
+ }\r
+ \r
+ return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"]";\r
}\r
};\r
\r
\r
#include <core/producer/frame_producer.h>\r
#include <core/producer/frame/basic_frame.h>\r
-#include <core/producer/frame/frame_transform.h>\r
-#include <core/producer/frame/pixel_format.h>\r
#include <core/producer/frame/frame_factory.h>\r
#include <core/mixer/write_frame.h>\r
\r
+#include <common/concurrency/governor.h>\r
#include <common/env.h>\r
#include <common/exception/exceptions.h>\r
#include <common/log/log.h>\r
\r
+#include <boost/range/algorithm_ext/push_back.hpp>\r
+\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (disable : 4244)\r
#pragma warning (pop)\r
#endif\r
\r
-#include <boost/foreach.hpp>\r
-#include <boost/range/algorithm_ext/push_back.hpp>\r
-\r
-#include <deque>\r
-#include <queue>\r
-#include <vector>\r
+#include <agents.h>\r
\r
-using namespace caspar::core;\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct frame_muxer::implementation : boost::noncopyable\r
-{ \r
- frame_muxer::target_t& target_;\r
- std::deque<std::queue<safe_ptr<write_frame>>> video_streams_;\r
- std::deque<core::audio_buffer> audio_streams_;\r
- std::deque<safe_ptr<basic_frame>> frame_buffer_;\r
- display_mode::type display_mode_;\r
- const double in_fps_;\r
- const video_format_desc format_desc_;\r
- bool auto_transcode_;\r
-\r
- size_t audio_sample_count_;\r
- size_t video_frame_count_;\r
- \r
- size_t processed_audio_sample_count_;\r
- size_t processed_video_frame_count_;\r
+struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
+{ \r
+ frame_muxer2::video_source_t* video_source_;\r
+ frame_muxer2::audio_source_t* audio_source_;\r
+\r
+ ITarget<frame_muxer2::target_element_t>& target_;\r
+ mutable single_assignment<display_mode::type> display_mode_;\r
+ const double in_fps_;\r
+ const core::video_format_desc format_desc_;\r
+ const bool auto_transcode_;\r
+ \r
+ mutable single_assignment<safe_ptr<filter>> filter_;\r
+ const safe_ptr<core::frame_factory> frame_factory_;\r
+ \r
+ core::audio_buffer audio_data_;\r
+ \r
+ std::queue<frame_muxer2::video_source_element_t> video_frames_;\r
+ std::queue<frame_muxer2::audio_source_element_t> audio_buffers_;\r
\r
- filter filter_;\r
- safe_ptr<core::frame_factory> frame_factory_;\r
- std::wstring filter_str_;\r
- \r
- implementation(frame_muxer::target_t& target, double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
- : target_(target)\r
- , video_streams_(1)\r
- , audio_streams_(1)\r
- , display_mode_(display_mode::invalid)\r
+ std::wstring filter_str_;\r
+\r
+ governor governor_;\r
+ tbb::atomic<bool> is_running_;\r
+ \r
+ implementation(frame_muxer2::video_source_t* video_source,\r
+ frame_muxer2::audio_source_t* audio_source,\r
+ frame_muxer2::target_t& target,\r
+ double in_fps, \r
+ const safe_ptr<core::frame_factory>& frame_factory,\r
+ const std::wstring& filter)\r
+ : video_source_(video_source)\r
+ , audio_source_(audio_source)\r
+ , target_(target)\r
, in_fps_(in_fps)\r
, format_desc_(frame_factory->get_video_format_desc())\r
, auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
- , audio_sample_count_(0)\r
- , video_frame_count_(0)\r
, frame_factory_(frame_factory)\r
- , filter_str_(filter_str)\r
- {\r
+ , governor_(2)\r
+ { \r
+ is_running_ = true;\r
+ start();\r
}\r
\r
- void push(const safe_ptr<AVFrame>& video_frame, int hints)\r
- { \r
- if(video_frame == loop_video())\r
- { \r
- CASPAR_LOG(debug) << L"video-frame-count: " << static_cast<float>(video_frame_count_);\r
- video_frame_count_ = 0;\r
- video_streams_.push_back(std::queue<safe_ptr<write_frame>>());\r
- return;\r
- }\r
- \r
- if(display_mode_ == display_mode::invalid)\r
- initialize(*video_frame);\r
- \r
- if(hints & core::frame_producer::ALPHA_HINT)\r
- video_frame->format = make_alpha_format(video_frame->format);\r
- \r
- auto format = video_frame->format;\r
- if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
- video_frame->format = PIX_FMT_GRAY8;\r
+ ~implementation()\r
+ {\r
+ is_running_ = false;\r
+ governor_.cancel();\r
+ agent::wait(this);\r
+ }\r
+ \r
+ safe_ptr<core::write_frame> receive_video()\r
+ { \r
+ if(!video_source_)\r
+ return make_safe<core::write_frame>(this); \r
\r
- filter_.push(video_frame);\r
- BOOST_FOREACH(auto& av_frame, filter_.poll_all())\r
+ if(!video_frames_.empty())\r
{\r
- av_frame->format = format;\r
- \r
- video_streams_.back().push(make_write_frame(this, av_frame, frame_factory_, hints));\r
- ++video_frame_count_;\r
+ auto video_frame = std::move(video_frames_.front());\r
+ video_frames_.pop();\r
+ return make_write_frame(this, video_frame, frame_factory_, 0);\r
}\r
\r
- if(video_streams_.back().size() > 8)\r
- BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("video-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+ auto video = receive(video_source_);\r
\r
- commit();\r
- }\r
+ if(video == loop_video())\r
+ return receive_video();\r
\r
- void push(const safe_ptr<core::audio_buffer>& audio_samples)\r
- {\r
- if(audio_samples == loop_audio()) \r
+ if(!is_running_ || video == eof_video())\r
{\r
- CASPAR_LOG(debug) << L"audio-chunk-count: " << audio_sample_count_/format_desc_.audio_samples_per_frame;\r
- audio_streams_.push_back(core::audio_buffer());\r
- audio_sample_count_ = 0;\r
- return;\r
- }\r
-\r
- audio_sample_count_ += audio_samples->size();\r
-\r
- boost::range::push_back(audio_streams_.back(), *audio_samples);\r
-\r
- if(audio_streams_.back().size() > 8*format_desc_.audio_samples_per_frame)\r
- BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("audio-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+ is_running_ = false;\r
+ return make_safe<core::write_frame>(this); \r
+ } \r
\r
- commit();\r
- }\r
- \r
- safe_ptr<core::write_frame> pop_video()\r
- {\r
- auto frame = video_streams_.front().front();\r
- video_streams_.front().pop();\r
+ if(!display_mode_.has_value())\r
+ initialize_display_mode(*video);\r
+ \r
+ filter_.value()->push(std::move(video));\r
+ for(auto frame = filter_.value()->poll(); frame; frame = filter_.value()->poll()) \r
+ video_frames_.push(make_safe_ptr(frame)); \r
\r
- return frame;\r
- }\r
-\r
- core::audio_buffer pop_audio()\r
- {\r
- auto begin = audio_streams_.front().begin();\r
- auto end = begin + format_desc_.audio_samples_per_frame;\r
-\r
- auto samples = core::audio_buffer(begin, end);\r
- audio_streams_.front().erase(begin, end);\r
-\r
- return samples;\r
+ return receive_video();\r
}\r
\r
- bool need_video() const\r
+ safe_ptr<core::audio_buffer> receive_audio()\r
{ \r
- return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && need_video2());\r
- }\r
- \r
- bool need_audio() const\r
- {\r
- return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && need_audio2());\r
- }\r
+ if(!audio_source_)\r
+ return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
\r
- bool need_video2() const\r
- { \r
- switch(display_mode_)\r
+ if(!audio_buffers_.empty())\r
{\r
- case display_mode::deinterlace_bob_reinterlace: \r
- case display_mode::interlace: \r
- return video_streams_.front().size() >= 2;\r
- default: \r
- return !video_streams_.front().empty();\r
+ auto audio_buffer = std::move(audio_buffers_.front());\r
+ audio_buffers_.pop();\r
+ return audio_buffer;\r
}\r
- }\r
- \r
- bool need_audio2() const\r
- {\r
- switch(display_mode_)\r
+ \r
+ auto audio = receive(audio_source_);\r
+\r
+ if(audio == loop_audio())\r
{\r
- case display_mode::duplicate: \r
- return audio_streams_.front().size()/2 >= format_desc_.audio_samples_per_frame;\r
- default: \r
- return audio_streams_.front().size() >= format_desc_.audio_samples_per_frame;\r
+ if(!audio_data_.empty())\r
+ {\r
+ CASPAR_LOG(info) << L"[frame_muxer] Truncating audio: " << audio_data_.size();\r
+ audio_data_.clear();\r
+ }\r
+ return receive_audio();\r
}\r
- }\r
- \r
- bool try_pop(safe_ptr<core::basic_frame>& frame)\r
- {\r
- commit();\r
\r
- if(frame_buffer_.empty())\r
- return false;\r
+ if(!is_running_ || audio == eof_audio())\r
+ {\r
+ is_running_ = false;\r
+ return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
+ } \r
+ \r
+ audio_data_.insert(audio_data_.end(), audio->begin(), audio->end()); \r
+ while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
+ {\r
+ auto begin = audio_data_.begin(); \r
+ auto end = begin + format_desc_.audio_samples_per_frame;\r
+ auto audio = make_safe<core::audio_buffer>(begin, end);\r
+ audio_data_.erase(begin, end);\r
+ audio_buffers_.push(audio);\r
+ }\r
\r
- frame = std::move(frame_buffer_.front());\r
- frame_buffer_.pop_front(); \r
- return true;\r
+ return receive_audio();\r
}\r
-\r
- void commit()\r
+ \r
+ virtual void run()\r
{\r
- if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!need_video2() || !need_audio2()))\r
- {\r
- if(!video_streams_.front().empty() || !audio_streams_.front().empty())\r
- CASPAR_LOG(debug) << "Truncating: " << video_streams_.front().size() << L" video-frames, " << audio_streams_.front().size() << L" audio-samples.";\r
+ win32_exception::install_handler();\r
\r
- video_streams_.pop_front();\r
- audio_streams_.pop_front();\r
+ try\r
+ {\r
+ while(is_running_)\r
+ { \r
+ auto ticket = governor_.acquire();\r
+\r
+ auto video = receive_video();\r
+ video->audio_data() = std::move(*receive_audio());\r
+\r
+ if(!is_running_)\r
+ break;\r
+\r
+ switch(display_mode_.value())\r
+ {\r
+ case display_mode::simple: \r
+ case display_mode::deinterlace:\r
+ case display_mode::deinterlace_bob:\r
+ {\r
+ send(target_, frame_muxer2::target_element_t(std::move(video), ticket));\r
+\r
+ break;\r
+ }\r
+ case display_mode::duplicate: \r
+ { \r
+ auto video2 = make_safe<core::write_frame>(*video); \r
+ video2->audio_data() = std::move(*receive_audio());\r
+\r
+ send(target_, frame_muxer2::target_element_t(std::move(video), ticket)); \r
+ send(target_, frame_muxer2::target_element_t(std::move(video2), ticket));\r
+\r
+ break;\r
+ }\r
+ case display_mode::half: \r
+ { \r
+ send(target_, frame_muxer2::target_element_t(std::move(video), ticket));\r
+ receive_video();\r
+\r
+ break;\r
+ }\r
+ case display_mode::deinterlace_bob_reinterlace:\r
+ case display_mode::interlace: \r
+ { \r
+ auto video2 = receive_video();\r
+ \r
+ auto frame = core::basic_frame::interlace(std::move(video), std::move(video2), format_desc_.field_mode); \r
+ send(target_, frame_muxer2::target_element_t(std::move(frame), ticket));\r
+\r
+ break;\r
+ }\r
+ default: \r
+ BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
+ }\r
+ } \r
}\r
-\r
- if(!need_video2() || !need_audio2())\r
- return;\r
- \r
- switch(display_mode_)\r
+ catch(...)\r
{\r
- case display_mode::simple:\r
- case display_mode::deinterlace_bob:\r
- case display_mode::deinterlace: \r
- return simple();\r
- case display_mode::duplicate: \r
- return duplicate();\r
- case display_mode::half: \r
- return half();\r
- case display_mode::interlace:\r
- case display_mode::deinterlace_bob_reinterlace: \r
- return interlace();\r
- default: \r
- BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
- }\r
- \r
- void simple()\r
- { \r
- auto frame1 = pop_video();\r
- frame1->audio_data() = pop_audio();\r
\r
- send(target_, safe_ptr<basic_frame>(frame1)); \r
- }\r
+ send(target_, frame_muxer2::target_element_t(core::basic_frame::eof(), ticket_t()));\r
\r
- void duplicate()\r
- { \r
- auto frame = pop_video();\r
-\r
- auto frame1 = make_safe<core::write_frame>(*frame); // make a copy\r
- frame1->audio_data() = pop_audio();\r
-\r
- auto frame2 = frame;\r
- frame2->audio_data() = pop_audio();\r
- \r
- send(target_, safe_ptr<basic_frame>(frame1)); \r
- send(target_, safe_ptr<basic_frame>(frame2)); \r
+ done();\r
}\r
\r
- void half()\r
- { \r
- auto frame1 = pop_video();\r
- frame1->audio_data() = pop_audio();\r
- \r
- video_streams_.front().pop(); // Throw away\r
- \r
- send(target_, safe_ptr<basic_frame>(frame1)); \r
- }\r
- \r
- void interlace()\r
- { \r
- auto frame1 = pop_video();\r
- frame1->audio_data() = pop_audio();\r
- \r
- auto frame2 = pop_video();\r
- \r
- send(target_, core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode)); \r
- }\r
- \r
- void initialize(AVFrame& frame)\r
+ void initialize_display_mode(AVFrame& frame)\r
{\r
auto display_mode = display_mode::invalid;\r
\r
display_mode = display_mode::simple;\r
}\r
\r
- filter_ = filter(filter_str_);\r
- display_mode_ = display_mode;\r
+ send(filter_, make_safe<filter>(filter_str_));\r
\r
CASPAR_LOG(info) << "[frame_muxer] " << display_mode::print(display_mode);\r
- }\r
- \r
\r
+ send(display_mode_, display_mode);\r
+ }\r
+ \r
int64_t calc_nb_frames(int64_t nb_frames) const\r
{\r
- switch(display_mode_) // Take into account transformation in run.\r
+ switch(display_mode_.value()) // Take into account transformation in run.\r
{\r
case display_mode::deinterlace_bob_reinterlace:\r
case display_mode::interlace: \r
break;\r
}\r
\r
- if(is_double_rate(widen(filter_.filter_str()))) // Take into account transformations in filter.\r
+ if(is_double_rate(widen(filter_.value()->filter_str()))) // Take into account transformations in filter.\r
nb_frames *= 2;\r
\r
return nb_frames;\r
}\r
};\r
\r
-frame_muxer::frame_muxer(target_t& target, double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
- : impl_(new implementation(target, in_fps, frame_factory, filter_str)){}\r
-void frame_muxer::push(const safe_ptr<AVFrame>& video_frame, int hints){impl_->push(video_frame, hints);}\r
-void frame_muxer::push(const safe_ptr<core::audio_buffer>& audio_samples){return impl_->push(audio_samples);}\r
-bool frame_muxer::need_video() const{return impl_->need_video();}\r
-bool frame_muxer::need_audio() const{return impl_->need_audio();}\r
-int64_t frame_muxer::calc_nb_frames(int64_t nb_frames) const {return impl_->calc_nb_frames(nb_frames);}\r
+frame_muxer2::frame_muxer2(video_source_t* video_source, \r
+ audio_source_t* audio_source,\r
+ target_t& target,\r
+ double in_fps, \r
+ const safe_ptr<core::frame_factory>& frame_factory,\r
+ const std::wstring& filter)\r
+ : impl_(new implementation(video_source, audio_source, target, in_fps, frame_factory, filter))\r
+{\r
+}\r
+\r
+int64_t frame_muxer2::calc_nb_frames(int64_t nb_frames) const\r
+{\r
+ return impl_->calc_nb_frames(nb_frames);\r
+}\r
\r
}}
\ No newline at end of file
#pragma once\r
\r
+#include "util.h"\r
+\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <core/mixer/audio/audio_mixer.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
+\r
#include <vector>\r
\r
struct AVFrame;\r
\r
}\r
\r
-#include <agents.h>\r
-\r
namespace ffmpeg {\r
\r
-class frame_muxer : boost::noncopyable\r
+class frame_muxer2 : boost::noncopyable\r
{\r
public:\r
- typedef Concurrency::ITarget<safe_ptr<core::basic_frame>> target_t;\r
-\r
- frame_muxer(target_t& target, double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str);\r
\r
- void push(const safe_ptr<AVFrame>& video_frame, int hints = 0);\r
- void push(const safe_ptr<core::audio_buffer>& audio_samples);\r
+ typedef safe_ptr<AVFrame> video_source_element_t;\r
+ typedef safe_ptr<core::audio_buffer> audio_source_element_t;\r
+ typedef std::pair<safe_ptr<core::basic_frame>, ticket_t> target_element_t;\r
+\r
+ typedef Concurrency::ISource<video_source_element_t> video_source_t;\r
+ typedef Concurrency::ISource<audio_source_element_t> audio_source_t;\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
+ \r
+ frame_muxer2(video_source_t* video_source,\r
+ audio_source_t* audio_source, \r
+ target_t& target,\r
+ double in_fps, \r
+ const safe_ptr<core::frame_factory>& frame_factory,\r
+ const std::wstring& filter = L"");\r
\r
- bool need_video() const;\r
- bool need_audio() const; \r
-\r
int64_t calc_nb_frames(int64_t nb_frames) const;\r
private:\r
struct implementation;\r
#include "../stdafx.h"\r
\r
#include "input.h"\r
-\r
#include "util.h"\r
#include "../ffmpeg_error.h"\r
\r
#include <common/exception/exceptions.h>\r
#include <common/exception/win32_exception.h>\r
\r
-#include <tbb/concurrent_queue.h>\r
#include <tbb/atomic.h>\r
\r
-#include <boost/range/algorithm.hpp>\r
-#include <boost/thread/condition_variable.hpp>\r
-#include <boost/thread/mutex.hpp>\r
-#include <boost/thread/thread.hpp>\r
+#include <agents.h>\r
+#include <concrt_extras.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
-#include <concrt_extras.h>\r
\r
+#undef Yield\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-static const size_t MAX_BUFFER_SIZE = 16 * 1000000;\r
-static const size_t MAX_BUFFER_COUNT = 100;\r
+static const size_t MAX_PACKETS_SIZE = 16 * 1000000;\r
+static const size_t MAX_PACKETS_COUNT = 50;\r
\r
-struct input::implementation : public agent, boost::noncopyable\r
-{ \r
- const safe_ptr<AVFormatContext> format_context_; \r
- const safe_ptr<diagnostics::graph> graph_;\r
+struct input::implementation : public Concurrency::agent, boost::noncopyable\r
+{\r
+ input::target_t& target_;\r
\r
- input::target_t& target_;\r
+ const std::wstring filename_;\r
+ const safe_ptr<AVFormatContext> format_context_; // Destroy this last\r
+ int default_stream_index_;\r
+ const boost::iterator_range<AVStream**> streams_;\r
\r
- const int default_stream_index_;\r
- \r
- const std::wstring filename_;\r
- const bool loop_;\r
- const size_t start_; \r
- const size_t length_;\r
- size_t frame_number_;\r
- \r
- tbb::atomic<size_t> buffer_size_;\r
- tbb::atomic<size_t> buffer_count_;\r
- Concurrency::event event_;\r
+ safe_ptr<diagnostics::graph> graph_;\r
\r
- tbb::atomic<bool> is_running_;\r
+ tbb::atomic<bool> loop_;\r
+ const size_t start_; \r
+ const size_t length_;\r
+ size_t frame_number_;\r
+ \r
+ tbb::atomic<size_t> nb_frames_;\r
+ tbb::atomic<size_t> nb_loops_; \r
+ tbb::atomic<size_t> packets_count_;\r
+ tbb::atomic<size_t> packets_size_;\r
\r
- tbb::atomic<size_t> nb_frames_;\r
- tbb::atomic<size_t> nb_loops_;\r
+ tbb::atomic<bool> is_running_;\r
\r
+ Concurrency::event event_;\r
+ \r
public:\r
- explicit implementation(input::target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
- : format_context_(open_input(filename))\r
- , graph_(graph)\r
- , target_(target)\r
- , default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
- , loop_(loop)\r
+ explicit implementation(input::target_t& target,\r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, \r
+ bool loop, \r
+ size_t start,\r
+ size_t length)\r
+ : target_(target)\r
, filename_(filename)\r
+ , format_context_(open_input(filename)) \r
+ , default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
+ , streams_(format_context_->streams, format_context_->streams + format_context_->nb_streams)\r
+ , graph_(graph)\r
, start_(start)\r
, length_(length)\r
, frame_number_(0)\r
- { \r
+ { \r
event_.set();\r
- \r
+ loop_ = loop;\r
+ \r
+ //av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
+ \r
if(start_ > 0) \r
seek_frame(start_);\r
\r
- graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); \r
- graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
-\r
- agent::start();\r
+ graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+ \r
+ is_running_ = true;\r
}\r
\r
+ ~implementation()\r
+ {\r
+ if(is_running_)\r
+ stop();\r
+ }\r
+ \r
void stop()\r
{\r
is_running_ = false;\r
event_.set();\r
agent::wait(this);\r
}\r
-\r
- size_t nb_frames() const\r
- {\r
- return nb_frames_;\r
- }\r
-\r
- size_t nb_loops() const\r
- {\r
- return nb_loops_;\r
- }\r
-\r
-private:\r
\r
virtual void run()\r
- { \r
+ {\r
+ win32_exception::install_handler();\r
+\r
try\r
- { \r
- is_running_ = true;\r
- while(is_running_)\r
- {\r
- read_next_packet(); \r
- event_.wait(); \r
+ {\r
+ for(auto packet = read_next_packet(); packet && is_running_; packet = read_next_packet())\r
+ { \r
+ Concurrency::asend(target_, make_safe_ptr(packet));\r
+ Context::Yield();\r
+ event_.wait();\r
}\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
- \r
- is_running_ = false;\r
+ } \r
+ \r
+ BOOST_FOREACH(auto stream, streams_)\r
+ Concurrency::send(target_, eof_packet(stream->index)); \r
+\r
done();\r
}\r
- \r
- void read_next_packet()\r
- { \r
- auto packet = create_packet();\r
\r
+ std::shared_ptr<AVPacket> read_next_packet()\r
+ { \r
+ auto packet = create_packet();\r
+ \r
int ret = [&]() -> int\r
{\r
- scoped_oversubcription_token oversubscribe;\r
+ Concurrency::scoped_oversubcription_token oversubscribe;\r
return av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
}();\r
\r
{\r
++nb_loops_;\r
frame_number_ = 0;\r
- \r
+\r
if(loop_)\r
{\r
CASPAR_LOG(trace) << print() << " Looping.";\r
else\r
{\r
CASPAR_LOG(trace) << print() << " Stopping.";\r
- is_running_ = false;\r
+ return nullptr;\r
}\r
}\r
- else\r
- { \r
- THROW_ON_ERROR(ret, "av_read_frame", print());\r
-\r
- if(packet->stream_index == default_stream_index_)\r
- {\r
- if(nb_loops_ == 0)\r
- ++nb_frames_;\r
- ++frame_number_;\r
- }\r
\r
- THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
- \r
- // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
- auto size = packet->size;\r
- auto data = packet->data;\r
+ THROW_ON_ERROR(ret, "av_read_frame", print());\r
\r
- packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
- {\r
- packet->size = size;\r
- packet->data = data;\r
-\r
- buffer_size_ -= packet->size;\r
- --buffer_count_;\r
- event_.set();\r
- graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
- graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
- });\r
- \r
- buffer_size_ += packet->size;\r
- ++buffer_count_;\r
- if((buffer_size_ > MAX_BUFFER_SIZE || buffer_count_ > MAX_BUFFER_COUNT) && is_running_)\r
- event_.reset();\r
+ if(packet->stream_index == default_stream_index_)\r
+ {\r
+ if(nb_loops_ == 0)\r
+ ++nb_frames_;\r
+ ++frame_number_;\r
+ }\r
\r
- send(target_, packet);\r
+ THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
\r
- graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
- graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
- } \r
+ // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+ auto size = packet->size;\r
+ auto data = packet->data; \r
+ \r
+ ++packets_count_;\r
+ packets_size_ += size;\r
+\r
+ packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+ {\r
+ packet->size = size;\r
+ packet->data = data;\r
+ --packets_count_;\r
+ packets_size_ -= size;\r
+ event_.set();\r
+ });\r
+\r
+ if(is_running_ && (packets_count_ > MAX_PACKETS_COUNT || packets_size_ > MAX_PACKETS_SIZE))\r
+ event_.reset();\r
+ \r
+ return packet;\r
}\r
- \r
+\r
void seek_frame(int64_t frame, int flags = 0)\r
{ \r
if(flags == AVSEEK_FLAG_BACKWARD)\r
}\r
\r
THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
- \r
- Concurrency::asend(target_, loop_packet()); \r
+ auto packet = create_packet();\r
+ packet->size = 0;\r
+\r
+ BOOST_FOREACH(auto stream, streams_)\r
+ Concurrency::asend(target_, loop_packet(stream->index)); \r
\r
graph_->add_tag("seek"); \r
- } \r
+ } \r
\r
bool is_eof(int ret)\r
{\r
if(ret == AVERROR_EOF)\r
CASPAR_LOG(trace) << print() << " Received EOF. " << nb_frames_;\r
\r
+ CASPAR_VERIFY(ret >= 0 || ret == AVERROR_EOF || ret == AVERROR(EIO), ffmpeg_error() << source_info(narrow(print())));\r
+\r
return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF;\r
}\r
\r
}\r
};\r
\r
-input::input(target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length)\r
- : impl_(new implementation(target, graph, filename, loop, start, length)){}\r
-bool input::eof() const {return !impl_->is_running_;}\r
-safe_ptr<AVFormatContext> input::context(){return impl_->format_context_;}\r
-size_t input::nb_frames() const {return impl_->nb_frames();}\r
-size_t input::nb_loops() const {return impl_->nb_loops();}\r
-void input::stop(){impl_->stop();}\r
+input::input(input::target_t& target,\r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, \r
+ bool loop, \r
+ size_t start, \r
+ size_t length)\r
+ : impl_(new implementation(target, graph, filename, loop, start, length))\r
+{\r
+}\r
+\r
+safe_ptr<AVFormatContext> input::context()\r
+{\r
+ return safe_ptr<AVFormatContext>(impl_->format_context_);\r
+}\r
+\r
+size_t input::nb_frames() const\r
+{\r
+ return impl_->nb_frames_;\r
+}\r
+\r
+size_t input::nb_loops() const \r
+{\r
+ return impl_->nb_loops_;\r
+}\r
+\r
+void input::start()\r
+{\r
+ impl_->start();\r
+}\r
+\r
+void input::stop()\r
+{\r
+ impl_->stop();\r
+}\r
+\r
+bool input::loop() const\r
+{\r
+ return impl_->loop_;\r
+}\r
+void input::loop(bool value)\r
+{\r
+ impl_->loop_ = value;\r
+}\r
+\r
}}
\ No newline at end of file
*/\r
#pragma once\r
\r
+#include "util.h"\r
+\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
+\r
+#include <agents.h>\r
+#include <concrt.h>\r
\r
#include <memory>\r
#include <string>\r
\r
#include <boost/noncopyable.hpp>\r
-\r
-#include <agents.h>\r
+#include <boost/range/iterator_range.hpp>\r
\r
struct AVFormatContext;\r
struct AVPacket;\r
}\r
\r
namespace ffmpeg {\r
-\r
+ \r
class input : boost::noncopyable\r
{\r
public:\r
- typedef Concurrency::ITarget<safe_ptr<AVPacket>> target_t;\r
-\r
- explicit input(target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start = 0, size_t length = std::numeric_limits<size_t>::max());\r
-\r
- bool eof() const;\r
+ \r
+ typedef safe_ptr<AVPacket> target_element_t;\r
\r
- void stop();\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
+ explicit input(target_t& target, \r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, bool loop, \r
+ size_t start = 0, \r
+ size_t length = std::numeric_limits<size_t>::max());\r
+ \r
size_t nb_frames() const;\r
size_t nb_loops() const;\r
-\r
+ \r
safe_ptr<AVFormatContext> context();\r
+\r
+ bool loop() const;\r
+ void loop(bool value);\r
+\r
+ void start();\r
+ void stop();\r
private:\r
+ friend struct implemenation;\r
struct implementation;\r
std::shared_ptr<implementation> impl_;\r
};\r
\r
// Dataflow\r
\r
-safe_ptr<AVPacket> loop_packet(int index = 0);\r
-safe_ptr<AVPacket> eof_packet(int index = 0);\r
+safe_ptr<AVPacket> loop_packet(int index);\r
+safe_ptr<AVPacket> eof_packet(int index);\r
safe_ptr<AVFrame> loop_video();\r
safe_ptr<AVFrame> eof_video();\r
safe_ptr<core::audio_buffer> loop_audio();\r
\r
#include "../../ffmpeg_error.h"\r
\r
-#include <core/producer/frame/frame_transform.h>\r
-#include <core/producer/frame/frame_factory.h>\r
-\r
-#include <boost/range/algorithm_ext/push_back.hpp>\r
-#include <boost/filesystem.hpp>\r
-\r
-#include <queue>\r
+#include <core/producer/frame/basic_frame.h>\r
+#include <common/memory/memcpy.h>\r
+#include <common/concurrency/governor.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
+#include <tbb/scalable_allocator.h>\r
+\r
+#undef Yield\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct video_decoder::implementation : boost::noncopyable\r
-{\r
- video_decoder::source_t& source_;\r
- const safe_ptr<core::frame_factory> frame_factory_;\r
+struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
+{ \r
int index_;\r
- const safe_ptr<AVCodecContext> codec_context_;\r
+ std::shared_ptr<AVCodecContext> codec_context_;\r
+ \r
+ double fps_;\r
+ int64_t nb_frames_;\r
\r
- std::queue<safe_ptr<AVPacket>> packets_;\r
+ size_t width_;\r
+ size_t height_;\r
+ bool is_progressive_;\r
\r
- const double fps_;\r
- const int64_t nb_frames_;\r
- const size_t width_;\r
- const size_t height_;\r
+ unbounded_buffer<video_decoder::source_element_t> source_;\r
+ ITarget<video_decoder::target_element_t>& target_;\r
+\r
+ governor governor_;\r
+ tbb::atomic<bool> is_running_;\r
\r
public:\r
- explicit implementation(video_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) \r
- : source_(source)\r
- , frame_factory_(frame_factory)\r
- , codec_context_(open_codec(*context, AVMEDIA_TYPE_VIDEO, index_))\r
+ explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
+ : codec_context_(open_codec(context, AVMEDIA_TYPE_VIDEO, index_))\r
, fps_(static_cast<double>(codec_context_->time_base.den) / static_cast<double>(codec_context_->time_base.num))\r
- , nb_frames_(context->streams[index_]->nb_frames)\r
+ , nb_frames_(context.streams[index_]->nb_frames)\r
, width_(codec_context_->width)\r
, height_(codec_context_->height)\r
- { \r
- }\r
- \r
- std::shared_ptr<AVFrame> poll()\r
+ , is_progressive_(true)\r
+ , source_([this](const video_decoder::source_element_t& packet){return packet->stream_index == index_;})\r
+ , target_(target)\r
+ , governor_(2)\r
{ \r
- auto packet = create_packet();\r
+ CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
\r
- if(packets_.empty())\r
- {\r
- if(!try_receive(source_, packet) || packet->stream_index != index_)\r
- return nullptr;\r
- else\r
- packets_.push(packet);\r
- }\r
+ source.link_target(&source_);\r
\r
- packet = packets_.front();\r
-\r
- std::shared_ptr<AVFrame> video;\r
-\r
- if(packet == loop_packet())\r
- {\r
- if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
- {\r
- AVPacket pkt;\r
- av_init_packet(&pkt);\r
- pkt.data = nullptr;\r
- pkt.size = 0;\r
-\r
- video = decode(pkt);\r
- if(video)\r
- packets_.push(packet);\r
- }\r
+ is_running_ = true;\r
+ start();\r
+ }\r
\r
- if(!video)\r
- {\r
- avcodec_flush_buffers(codec_context_.get());\r
- video = loop_video();\r
- }\r
- } \r
- else\r
- video = decode(*packet);\r
-\r
- if(packet->size == 0)\r
- packets_.pop();\r
- \r
- return video;\r
+ ~implementation()\r
+ {\r
+ is_running_ = false;\r
+ governor_.cancel();\r
+ agent::wait(this);\r
}\r
\r
- std::shared_ptr<AVFrame> decode(AVPacket& pkt)\r
+ std::shared_ptr<AVFrame> decode(AVPacket& packet)\r
{\r
std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), av_free);\r
\r
int frame_finished = 0;\r
- THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, &pkt), "[video_decocer]");\r
- \r
+ THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, &packet), "[video_decocer]");\r
+\r
+ // 1 packet <=> 1 frame.\r
// If a decoder consumes less then the whole packet then something is wrong\r
// that might be just harmless padding at the end, or a problem with the\r
// AVParser or demuxer which puted more then one frame in a AVPacket.\r
- pkt.data = nullptr;\r
- pkt.size = 0;\r
\r
if(frame_finished == 0) \r
return nullptr;\r
-\r
+ \r
if(decoded_frame->repeat_pict > 0)\r
CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
- \r
+\r
return decoded_frame;\r
}\r
- \r
+\r
+ virtual void run()\r
+ {\r
+ win32_exception::install_handler();\r
+\r
+ try\r
+ {\r
+ while(is_running_)\r
+ {\r
+ auto ticket = governor_.acquire();\r
+ auto packet = receive(source_);\r
+ \r
+ if(packet == loop_packet(index_))\r
+ { \r
+ if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+ {\r
+ AVPacket pkt;\r
+ av_init_packet(&pkt);\r
+ pkt.data = nullptr;\r
+ pkt.size = 0;\r
+\r
+ for(auto decoded_frame = decode(pkt); decoded_frame; decoded_frame = decode(pkt))\r
+ {\r
+ auto frame = dup_frame(make_safe_ptr(decoded_frame)); \r
+ send(target_, safe_ptr<AVFrame>(frame.get(), [frame, ticket](AVFrame*){}));\r
+ Context::Yield();\r
+ }\r
+ }\r
+\r
+ avcodec_flush_buffers(codec_context_.get());\r
+ send(target_, loop_video());\r
+ continue;\r
+ }\r
+\r
+ if(packet == eof_packet(index_))\r
+ break;\r
+\r
+ auto decoded_frame = decode(*packet);\r
+ if(!decoded_frame)\r
+ continue;\r
+ \r
+ is_progressive_ = decoded_frame->interlaced_frame == 0;\r
+ \r
+ // C-TODO: Avoid duplication.\r
+ // Need to dupliace frame data since avcodec_decode_video2 reuses it.\r
+ auto frame = dup_frame(make_safe_ptr(decoded_frame));\r
+ send(target_, safe_ptr<AVFrame>(frame.get(), [frame, ticket](AVFrame*){})); \r
+ Context::Yield();\r
+ }\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
+ \r
+ send(target_, eof_video());\r
+\r
+ done();\r
+ }\r
+\r
+ safe_ptr<AVFrame> dup_frame(const safe_ptr<AVFrame>& frame)\r
+ {\r
+ auto desc = get_pixel_format_desc(static_cast<PixelFormat>(frame->format), frame->width, frame->height);\r
+\r
+ auto count = desc.planes.size();\r
+ std::array<uint8_t*, 4> org_ptrs;\r
+ std::array<safe_ptr<uint8_t>, 4> new_ptrs;\r
+ parallel_for<size_t>(0, count, [&](size_t n)\r
+ {\r
+ CASPAR_ASSERT(frame->data[n]);\r
+ auto size = frame->linesize[n]*desc.planes[n].height;\r
+ new_ptrs[n] = fast_memdup(frame->data[n], size);\r
+ org_ptrs[n] = frame->data[n];\r
+ frame->data[n] = new_ptrs[n].get();\r
+ });\r
+\r
+ return safe_ptr<AVFrame>(frame.get(), [frame, org_ptrs, new_ptrs, count](AVFrame*)\r
+ {\r
+ for(size_t n = 0; n < count; ++n)\r
+ frame->data[n] = org_ptrs[n];\r
+ });\r
+ }\r
+ \r
double fps() const\r
{\r
return fps_;\r
}\r
};\r
\r
-video_decoder::video_decoder(video_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) : impl_(new implementation(source, context, frame_factory)){}\r
-std::shared_ptr<AVFrame> video_decoder::poll(){return impl_->poll();}\r
+video_decoder::video_decoder(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
+ : impl_(new implementation(source, target, context))\r
+{\r
+}\r
+\r
double video_decoder::fps() const{return impl_->fps();}\r
int64_t video_decoder::nb_frames() const{return impl_->nb_frames_;}\r
size_t video_decoder::width() const{return impl_->width_;}\r
size_t video_decoder::height() const{return impl_->height_;}\r
+bool video_decoder::is_progressive() const{return impl_->is_progressive_;}\r
\r
}}
\ No newline at end of file
*/\r
#pragma once\r
\r
+#include "../util.h"\r
+\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <core/video_format.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
+\r
#include <vector>\r
\r
struct AVFormatContext;\r
\r
namespace core {\r
struct frame_factory;\r
- class write_frame;\r
}\r
\r
-#include <agents.h>\r
-\r
namespace ffmpeg {\r
\r
class video_decoder : boost::noncopyable\r
{\r
public:\r
- typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
-\r
- explicit video_decoder(source_t& source, const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory);\r
\r
- std::shared_ptr<AVFrame> poll();\r
+ typedef safe_ptr<AVPacket> source_element_t;\r
+ typedef safe_ptr<AVFrame> target_element_t;\r
+\r
+ typedef Concurrency::ISource<source_element_t> source_t;\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
+ explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context); \r
+\r
size_t width() const;\r
size_t height() const;\r
\r
int64_t nb_frames() const;\r
+ bool is_progressive() const;\r
\r
double fps() const;\r
private:\r
</producers>\r
<channels>\r
<channel>\r
- <video-mode>1080p5000</video-mode>\r
+ <video-mode>1080i5000</video-mode>\r
<consumers>\r
<decklink>\r
<device>1</device>\r