\r
void __cdecl destroy_producer(LPVOID lpParam)\r
{\r
+ static Concurrency::critical_section mutex;\r
auto destruction = std::unique_ptr<destruction_context>(static_cast<destruction_context*>(lpParam));\r
\r
try\r
{ \r
if(destruction->producer.unique())\r
{\r
+ {\r
+ Concurrency::critical_section::scoped_lock lock(mutex);\r
+ Concurrency::wait(100);\r
+ }\r
Concurrency::scoped_oversubcription_token oversubscribe;\r
CASPAR_LOG(info) << "Destroying: " << destruction->producer->print();\r
destruction->producer.reset();\r
public:\r
implementation(int index, const video_format_desc& format_desc, ogl_device& ogl) \r
: format_desc_(format_desc)\r
- , governor_(3)\r
+ , governor_(2)\r
, output_(new caspar::core::output(mixer_frames_, format_desc))\r
, mixer_(new caspar::core::mixer(stage_frames_, mixer_frames_, format_desc, ogl))\r
, stage_(new caspar::core::stage(stage_frames_, governor_)) \r
#include "util/util.h"\r
\r
#include "consumer/decklink_consumer.h"\r
-#include "producer/decklink_producer.h"\r
+//#include "producer/decklink_producer.h"\r
\r
#include <core/consumer/frame_consumer.h>\r
#include <core/producer/frame_producer.h>\r
void init()\r
{\r
core::register_consumer_factory([](const std::vector<std::wstring>& params){return create_consumer(params);});\r
- core::register_producer_factory(create_producer);\r
+ //core::register_producer_factory(create_producer);\r
}\r
\r
std::wstring get_version() \r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">true</ExcludedFromBuild>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">true</ExcludedFromBuild>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>\r
</ClCompile>\r
<ClCompile Include="StdAfx.cpp">\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">Create</PrecompiledHeader>\r
\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Sun Oct 23 22:39:56 2011\r
+/* at Wed Oct 26 20:20:42 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Sun Oct 23 22:39:56 2011\r
+/* at Wed Oct 26 20:20:42 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
\r
try\r
{\r
- auto frame_element = Concurrency::receive(muxed_frames_);\r
- last_frame_ = frame = frame_element.first;\r
+ last_frame_ = frame = Concurrency::receive(muxed_frames_, 10);\r
}\r
catch(Concurrency::operation_timed_out&)\r
{ \r
Concurrency::parallel_invoke(\r
[&]\r
{\r
- Concurrency::send(video_frames_, ffmpeg::frame_muxer2::video_source_element_t(av_frame, ticket_t())); \r
+ Concurrency::send(video_frames_, av_frame); \r
},\r
[&]\r
{ \r
{\r
auto sample_frame_count = audio->GetSampleFrameCount();\r
auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
- Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), ticket_t()));\r
+ Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
}\r
else\r
- Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0), ticket_t())); \r
+ Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0)); \r
});\r
}\r
\r
#include "../../stdafx.h"\r
\r
#include "audio_decoder.h"\r
+\r
#include "audio_resampler.h"\r
\r
#include "../util.h"\r
\r
#include <tbb/cache_aligned_allocator.h>\r
\r
+#include <queue>\r
+\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (disable : 4244)\r
#pragma warning (pop)\r
#endif\r
\r
-#undef Yield\r
-using namespace Concurrency;\r
-\r
namespace caspar { namespace ffmpeg {\r
\r
-struct audio_decoder::implementation : public agent, boost::noncopyable\r
+struct audio_decoder::implementation : boost::noncopyable\r
{ \r
int index_;\r
std::shared_ptr<AVCodecContext> codec_context_; \r
- \r
+ const core::video_format_desc format_desc_;\r
audio_resampler resampler_;\r
- \r
+\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- unbounded_buffer<audio_decoder::source_element_t> source_;\r
- ITarget<audio_decoder::target_element_t>& target_;\r
- \r
+ std::queue<std::shared_ptr<AVPacket>> packets_;\r
public:\r
- explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
- : codec_context_(open_codec(context, AVMEDIA_TYPE_AUDIO, index_))\r
- , resampler_(format_desc.audio_channels, codec_context_->channels,\r
- format_desc.audio_sample_rate, codec_context_->sample_rate,\r
- AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
+ explicit implementation(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) \r
+ : codec_context_(open_codec(*context, AVMEDIA_TYPE_AUDIO, index_))\r
+ , format_desc_(format_desc) \r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
- , source_([this](const audio_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
- , target_(target)\r
- { \r
- CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
-\r
- source.link_target(&source_);\r
-\r
- start();\r
+ , resampler_(format_desc_.audio_channels, codec_context_->channels,\r
+ format_desc_.audio_sample_rate, codec_context_->sample_rate,\r
+ AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
+ { \r
}\r
\r
- ~implementation()\r
- {\r
- agent::wait(this);\r
- }\r
+ void push(const std::shared_ptr<AVPacket>& packet)\r
+ { \r
+ if(packet && packet->stream_index != index_)\r
+ return;\r
\r
- virtual void run()\r
+ packets_.push(packet);\r
+ } \r
+ \r
+ std::vector<std::shared_ptr<core::audio_buffer>> poll()\r
{\r
- try\r
+ std::vector<std::shared_ptr<core::audio_buffer>> result;\r
+\r
+ if(packets_.empty())\r
+ return result;\r
+ \r
+ auto packet = packets_.front();\r
+\r
+ if(packet) \r
{\r
- while(true)\r
- { \r
- auto element = receive(source_);\r
- auto packet = element.first;\r
- \r
- if(packet == loop_packet(index_))\r
- {\r
- avcodec_flush_buffers(codec_context_.get());\r
- send(target_, audio_decoder::target_element_t(loop_audio(), ticket_t()));\r
- continue;\r
- }\r
-\r
- if(packet == eof_packet(index_))\r
- break;\r
-\r
- auto result = std::make_shared<core::audio_buffer>();\r
-\r
- while(packet->size > 0)\r
- {\r
- buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
- int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
+ result.push_back(decode(*packet));\r
+ if(packet->size == 0) \r
+ packets_.pop();\r
+ }\r
+ else \r
+ { \r
+ avcodec_flush_buffers(codec_context_.get());\r
+ result.push_back(nullptr);\r
+ packets_.pop();\r
+ } \r
+\r
+ return result;\r
+ }\r
+ \r
+ std::shared_ptr<core::audio_buffer> decode(AVPacket& pkt)\r
+ { \r
+ buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
+ int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
\r
- int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
+ int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, &pkt), "[audio_decoder]");\r
\r
- // There might be several frames in one packet.\r
- packet->size -= ret;\r
- packet->data += ret;\r
+ // There might be several frames in one packet.\r
+ pkt.size -= ret;\r
+ pkt.data += ret;\r
\r
- buffer1_.resize(written_bytes);\r
+ buffer1_.resize(written_bytes);\r
\r
- buffer1_ = resampler_.resample(std::move(buffer1_));\r
+ buffer1_ = resampler_.resample(std::move(buffer1_));\r
\r
- const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
- const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
-\r
- send(target_, audio_decoder::target_element_t(make_safe<core::audio_buffer>(samples, samples + n_samples), element.second));\r
- Context::Yield();\r
- }\r
- }\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
+ const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
+ const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
\r
- send(target_, audio_decoder::target_element_t(eof_audio(), ticket_t()));\r
+ return std::make_shared<core::audio_buffer>(samples, samples + n_samples);\r
+ }\r
\r
- done();\r
+ bool ready() const\r
+ {\r
+ return !packets_.empty();\r
}\r
};\r
\r
-audio_decoder::audio_decoder(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc)\r
- : impl_(new implementation(source, target, context, format_desc))\r
-{\r
-}\r
-\r
-int64_t audio_decoder::nb_frames() const\r
-{\r
- return 0;\r
-}\r
+audio_decoder::audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) : impl_(new implementation(context, format_desc)){}\r
+void audio_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
+bool audio_decoder::ready() const{return impl_->ready();}\r
+std::vector<std::shared_ptr<core::audio_buffer>> audio_decoder::poll(){return impl_->poll();}\r
\r
}}
\ No newline at end of file
*/\r
#pragma once\r
\r
-#include "../util.h"\r
-\r
#include <core/mixer/audio/audio_mixer.h>\r
\r
#include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
-#include <agents.h>\r
#include <vector>\r
\r
struct AVPacket;\r
class audio_decoder : boost::noncopyable\r
{\r
public:\r
-\r
- typedef std::pair<safe_ptr<AVPacket>, ticket_t> source_element_t;\r
- typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t> target_element_t;\r
-\r
- typedef Concurrency::ISource<source_element_t>& source_t;\r
- typedef Concurrency::ITarget<target_element_t>& target_t;\r
+ explicit audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
\r
- explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
+ void push(const std::shared_ptr<AVPacket>& packet);\r
+ bool ready() const;\r
+ std::vector<std::shared_ptr<core::audio_buffer>> poll();\r
\r
- int64_t nb_frames() const;\r
-\r
private:\r
- \r
struct implementation;\r
safe_ptr<implementation> impl_;\r
};\r
{ \r
std::shared_ptr<ReSampleContext> resampler_;\r
\r
+ std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> copy_buffer_;\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer2_;\r
\r
const size_t output_channels_;\r
L" audio_channels:" << input_channels <<\r
L" sample_fmt:" << input_sample_format;\r
\r
- CASPAR_VERIFY(resampler, caspar_exception());\r
-\r
- resampler_.reset(resampler, audio_resample_close);\r
+ if(resampler)\r
+ resampler_.reset(resampler, audio_resample_close);\r
+ else\r
+ BOOST_THROW_EXCEPTION(caspar_exception());\r
} \r
}\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> resample(std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>>&& data)\r
{\r
- if(resampler_ && !data.empty())\r
+ if(resampler_)\r
{\r
buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
auto ret = audio_resample(resampler_.get(),\r
#include "util.h"\r
#include "audio/audio_decoder.h"\r
#include "video/video_decoder.h"\r
-#include "../ffmpeg_error.h"\r
\r
#include <common/env.h>\r
#include <common/utility/assert.h>\r
#include <boost/range/algorithm/find_if.hpp>\r
#include <boost/range/algorithm/find.hpp>\r
\r
-#include <agents.h>\r
+#include <ppl.h>\r
\r
-#include <iterator>\r
-#include <vector>\r
-#include <string>\r
+#include <agents.h>\r
\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
- \r
+ \r
struct ffmpeg_producer : public core::frame_producer\r
-{ \r
- const std::wstring filename_;\r
- const int start_;\r
- const bool loop_;\r
- const size_t length_;\r
+{\r
+ const safe_ptr<diagnostics::graph> graph_;\r
+\r
+ const std::wstring filename_;\r
\r
- call<input::target_element_t> throw_away_;\r
- unbounded_buffer<input::target_element_t> packets_;\r
- std::shared_ptr<unbounded_buffer<frame_muxer2::video_source_element_t>> video_;\r
- std::shared_ptr<unbounded_buffer<frame_muxer2::audio_source_element_t>> audio_;\r
- unbounded_buffer<frame_muxer2::target_element_t> frames_;\r
- \r
- const safe_ptr<diagnostics::graph> graph_;\r
+ boost::timer frame_timer_;\r
+ boost::timer video_timer_;\r
+ boost::timer audio_timer_;\r
\r
- input input_; \r
- std::unique_ptr<video_decoder> video_decoder_;\r
- std::unique_ptr<audio_decoder> audio_decoder_; \r
- std::unique_ptr<frame_muxer2> muxer_;\r
+ const safe_ptr<core::frame_factory> frame_factory_;\r
+ const core::video_format_desc format_desc_;\r
+ \r
+ unbounded_buffer<std::shared_ptr<AVPacket>> packets_;\r
+ input input_; \r
+ video_decoder video_decoder_;\r
+ audio_decoder audio_decoder_; \r
+ double fps_;\r
+ frame_muxer muxer_;\r
+\r
+ const int start_;\r
+ const bool loop_;\r
+ const size_t length_;\r
+\r
+ safe_ptr<core::basic_frame> last_frame_;\r
+\r
+ const size_t width_;\r
+ const size_t height_;\r
+ bool is_progressive_;\r
\r
- safe_ptr<core::basic_frame> last_frame_;\r
\r
public:\r
explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
: filename_(filename)\r
+ , frame_factory_(frame_factory) \r
+ , format_desc_(frame_factory->get_video_format_desc())\r
+ , input_(packets_, graph_, filename_, loop, start, length)\r
+ , video_decoder_(input_.context(), frame_factory)\r
+ , audio_decoder_(input_.context(), frame_factory->get_video_format_desc())\r
+ , fps_(video_decoder_.fps())\r
+ , muxer_(fps_, frame_factory, filter)\r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
- , throw_away_([](const input::target_element_t&){})\r
- , input_(packets_, graph_, filename_, loop, start, length)\r
, last_frame_(core::basic_frame::empty())\r
- { \r
- try\r
- {\r
- auto video = std::make_shared<unbounded_buffer<frame_muxer2::video_source_element_t>>();\r
- video_decoder_.reset(new video_decoder(packets_, *video, *input_.context()));\r
- video_ = video;\r
- }\r
- catch(averror_stream_not_found&)\r
- {\r
- CASPAR_LOG(warning) << "No video-stream found. Running without video."; \r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(warning) << "Failed to open video-stream. Running without video."; \r
- }\r
-\r
- try\r
- {\r
- auto audio = std::make_shared<unbounded_buffer<frame_muxer2::audio_source_element_t>>();\r
- audio_decoder_.reset(new audio_decoder(packets_, *audio, *input_.context(), frame_factory->get_video_format_desc()));\r
- audio_ = audio;\r
- }\r
- catch(averror_stream_not_found&)\r
- {\r
- CASPAR_LOG(warning) << "No audio-stream found. Running without video."; \r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(warning) << "Failed to open audio-stream. Running without audio."; \r
- } \r
-\r
- CASPAR_VERIFY(video_decoder_ || audio_decoder_, ffmpeg_error());\r
- \r
- packets_.link_target(&throw_away_);\r
- muxer_.reset(new frame_muxer2(video_.get(), audio_.get(), frames_, video_decoder_ ? video_decoder_->fps() : frame_factory->get_video_format_desc().fps, frame_factory));\r
- \r
+ , width_(video_decoder_.width())\r
+ , height_(video_decoder_.height())\r
+ , is_progressive_(true)\r
+ {\r
+ graph_->add_guide("frame-time", 0.5);\r
+ graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));\r
graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f)); \r
- graph_->set_text(print());\r
diagnostics::register_graph(graph_);\r
-\r
- input_.start();\r
}\r
\r
~ffmpeg_producer()\r
{\r
- input_.stop(); \r
+ input_.stop();\r
}\r
- \r
+ \r
virtual safe_ptr<core::basic_frame> receive(int hints)\r
{\r
auto frame = core::basic_frame::late();\r
\r
- try\r
- { \r
- auto frame_element = Concurrency::receive(frames_, 10);\r
- frame = last_frame_ = frame_element.first;\r
- graph_->set_text(narrow(print()));\r
- }\r
- catch(operation_timed_out&)\r
- { \r
- graph_->add_tag("underflow"); \r
+ frame_timer_.restart();\r
+ \r
+ for(int n = 0; n < 64 && muxer_.empty(); ++n)\r
+ decode_frame(hints);\r
+ \r
+ graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
+\r
+ if(!muxer_.empty())\r
+ frame = last_frame_ = muxer_.pop(); \r
+ else\r
+ {\r
+ if(input_.eof())\r
+ return core::basic_frame::eof();\r
+ else \r
+ graph_->add_tag("underflow"); \r
}\r
\r
+ graph_->set_text(narrow(print()));\r
+ \r
return frame;\r
}\r
\r
{\r
return disable_audio(last_frame_);\r
}\r
- \r
- virtual int64_t nb_frames() const\r
+\r
+ void push_packets()\r
+ {\r
+ for(int n = 0; n < 16 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) \r
+ {\r
+ std::shared_ptr<AVPacket> pkt;\r
+ if(try_receive(packets_, pkt))\r
+ {\r
+ video_decoder_.push(pkt);\r
+ audio_decoder_.push(pkt);\r
+ }\r
+ }\r
+ }\r
+\r
+ void decode_frame(int hints)\r
+ {\r
+ push_packets();\r
+ \r
+ parallel_invoke(\r
+ [&]\r
+ {\r
+ if(muxer_.video_ready())\r
+ return;\r
+\r
+ auto video_frames = video_decoder_.poll();\r
+ BOOST_FOREACH(auto& video, video_frames) \r
+ {\r
+ is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_;\r
+ muxer_.push(video, hints); \r
+ }\r
+ },\r
+ [&]\r
+ {\r
+ if(muxer_.audio_ready())\r
+ return;\r
+ \r
+ auto audio_samples = audio_decoder_.poll();\r
+ BOOST_FOREACH(auto& audio, audio_samples)\r
+ muxer_.push(audio); \r
+ });\r
+\r
+ muxer_.commit();\r
+ }\r
+\r
+ virtual int64_t nb_frames() const \r
{\r
if(loop_)\r
return std::numeric_limits<int64_t>::max();\r
int64_t nb_frames = input_.nb_frames();\r
if(input_.nb_loops() < 1) // input still hasn't counted all frames\r
{\r
- int64_t video_nb_frames = video_decoder_->nb_frames();\r
- int64_t audio_nb_frames = audio_decoder_->nb_frames();\r
+ int64_t video_nb_frames = video_decoder_.nb_frames();\r
\r
- nb_frames = std::min(static_cast<int64_t>(length_), std::max(nb_frames, std::max(video_nb_frames, audio_nb_frames)));\r
+ nb_frames = std::min(static_cast<int64_t>(length_), std::max(nb_frames, video_nb_frames));\r
}\r
\r
- nb_frames = muxer_->calc_nb_frames(nb_frames);\r
- \r
- return nb_frames - start_;\r
- }\r
+ nb_frames = muxer_.calc_nb_frames(nb_frames);\r
\r
- virtual void param(const std::wstring& param)\r
- {\r
- typedef std::istream_iterator<std::wstring, wchar_t, std::char_traits<wchar_t>> wistream_iterator;\r
- std::wstringstream str(param);\r
- std::vector<std::wstring> params;\r
- std::copy(wistream_iterator(str), wistream_iterator(), std::back_inserter(params));\r
+ // TODO: Might need to scale nb_frames av frame_muxer transformations.\r
\r
- if(boost::iequals(params.at(0), L"LOOP"))\r
- input_.loop(boost::lexical_cast<bool>(params.at(1)));\r
+ return nb_frames - start_;\r
}\r
\r
virtual std::wstring print() const\r
{\r
- if(video_decoder_)\r
- {\r
- return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
- + boost::lexical_cast<std::wstring>(video_decoder_->width()) + L"x" + boost::lexical_cast<std::wstring>(video_decoder_->height())\r
- + (video_decoder_->is_progressive() ? L"p" : L"i") + boost::lexical_cast<std::wstring>(video_decoder_->is_progressive() ? video_decoder_->fps() : 2.0 * video_decoder_->fps())\r
- + L"]";\r
- }\r
- \r
- return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"]";\r
+ return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
+ + boost::lexical_cast<std::wstring>(width_) + L"x" + boost::lexical_cast<std::wstring>(height_)\r
+ + (is_progressive_ ? L"p" : L"i") + boost::lexical_cast<std::wstring>(is_progressive_ ? fps_ : 2.0 * fps_)\r
+ + L"]";\r
}\r
};\r
\r
\r
#include <core/producer/frame_producer.h>\r
#include <core/producer/frame/basic_frame.h>\r
+#include <core/producer/frame/frame_transform.h>\r
+#include <core/producer/frame/pixel_format.h>\r
#include <core/producer/frame/frame_factory.h>\r
#include <core/mixer/write_frame.h>\r
\r
#include <common/exception/exceptions.h>\r
#include <common/log/log.h>\r
\r
-#include <boost/range/algorithm_ext/push_back.hpp>\r
-\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (disable : 4244)\r
#pragma warning (pop)\r
#endif\r
\r
-#include <agents.h>\r
+#include <boost/foreach.hpp>\r
+#include <boost/range/algorithm_ext/push_back.hpp>\r
+\r
+#include <deque>\r
+#include <queue>\r
+#include <vector>\r
\r
-using namespace Concurrency;\r
+using namespace caspar::core;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
-{ \r
- typedef std::pair<safe_ptr<AVFrame>, ticket_t> video_element_t;\r
- typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t> audio_element_t;\r
-\r
- frame_muxer2::video_source_t* video_source_;\r
- frame_muxer2::audio_source_t* audio_source_;\r
-\r
- ITarget<frame_muxer2::target_element_t>& target_;\r
- mutable single_assignment<display_mode::type> display_mode_;\r
+struct frame_muxer::implementation : boost::noncopyable\r
+{ \r
+ std::deque<std::queue<safe_ptr<write_frame>>> video_streams_;\r
+ std::deque<core::audio_buffer> audio_streams_;\r
+ std::deque<safe_ptr<basic_frame>> frame_buffer_;\r
+ display_mode::type display_mode_;\r
const double in_fps_;\r
- const core::video_format_desc format_desc_;\r
- const bool auto_transcode_;\r
- \r
- mutable single_assignment<safe_ptr<filter>> filter_;\r
- const safe_ptr<core::frame_factory> frame_factory_;\r
- \r
- core::audio_buffer audio_data_;\r
- \r
- std::queue<video_element_t> video_frames_;\r
- std::queue<audio_element_t> audio_buffers_;\r
+ const video_format_desc format_desc_;\r
+ bool auto_transcode_;\r
\r
+ size_t audio_sample_count_;\r
+ size_t video_frame_count_;\r
+ \r
+ size_t processed_audio_sample_count_;\r
+ size_t processed_video_frame_count_;\r
+\r
+ filter filter_;\r
+ safe_ptr<core::frame_factory> frame_factory_;\r
std::wstring filter_str_;\r
- bool eof_;\r
- \r
- implementation(frame_muxer2::video_source_t* video_source,\r
- frame_muxer2::audio_source_t* audio_source,\r
- frame_muxer2::target_t& target,\r
- double in_fps, \r
- const safe_ptr<core::frame_factory>& frame_factory,\r
- const std::wstring& filter)\r
- : video_source_(video_source)\r
- , audio_source_(audio_source)\r
- , target_(target)\r
+ \r
+ implementation(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
+ : video_streams_(1)\r
+ , audio_streams_(1)\r
+ , display_mode_(display_mode::invalid)\r
, in_fps_(in_fps)\r
, format_desc_(frame_factory->get_video_format_desc())\r
, auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
+ , audio_sample_count_(0)\r
+ , video_frame_count_(0)\r
, frame_factory_(frame_factory)\r
- , eof_(false)\r
- { \r
- start();\r
- }\r
-\r
- ~implementation()\r
+ , filter_str_(filter_str)\r
{\r
- agent::wait(this);\r
}\r
- \r
- safe_ptr<core::write_frame> receive_video(ticket_t& tickets)\r
- { \r
- if(!video_source_)\r
- return make_safe<core::write_frame>(this); \r
\r
- if(!video_frames_.empty())\r
+ void push(const std::shared_ptr<AVFrame>& video_frame, int hints)\r
+ { \r
+ if(!video_frame)\r
+ { \r
+ CASPAR_LOG(debug) << L"video-frame-count: " << static_cast<float>(video_frame_count_);\r
+ video_frame_count_ = 0;\r
+ video_streams_.push_back(std::queue<safe_ptr<write_frame>>());\r
+ return;\r
+ }\r
+\r
+ if(video_frame->data[0] == nullptr)\r
{\r
- auto video_frame = std::move(video_frames_.front());\r
- video_frames_.pop();\r
- boost::range::push_back(tickets, video_frame.second);\r
- return make_write_frame(this, video_frame.first, frame_factory_, 0);\r
+ video_streams_.back().push(make_safe<core::write_frame>(this));\r
+ ++video_frame_count_;\r
+ display_mode_ = display_mode::simple;\r
+ return;\r
}\r
\r
- auto element = receive(video_source_);\r
- auto video = element.first;\r
+ if(display_mode_ == display_mode::invalid)\r
+ initialize_display_mode(*video_frame);\r
+ \r
+ if(hints & core::frame_producer::ALPHA_HINT)\r
+ video_frame->format = make_alpha_format(video_frame->format);\r
\r
- if(video == loop_video())\r
- return receive_video(tickets);\r
+ auto format = video_frame->format;\r
+ if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
+ video_frame->format = PIX_FMT_GRAY8;\r
\r
- if(eof_ || video == eof_video())\r
+ filter_.push(video_frame);\r
+ BOOST_FOREACH(auto& av_frame, filter_.poll_all())\r
{\r
- eof_ = true;\r
- return make_safe<core::write_frame>(this); \r
- } \r
+ av_frame->format = format;\r
+ \r
+ video_streams_.back().push(make_write_frame(this, av_frame, frame_factory_, hints));\r
+ ++video_frame_count_;\r
+ }\r
\r
- if(!display_mode_.has_value())\r
- initialize_display_mode(*video);\r
- \r
- filter_.value()->push(video);\r
- for(auto frame = filter_.value()->poll(); frame; frame = filter_.value()->poll()) \r
- video_frames_.push(video_element_t(make_safe_ptr(frame), element.second)); \r
- \r
- return receive_video(tickets);\r
+ if(video_streams_.back().size() > 8)\r
+ BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("video-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
}\r
- \r
- std::shared_ptr<core::audio_buffer> receive_audio(ticket_t& tickets)\r
- { \r
- if(!audio_source_)\r
- return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
\r
- if(!audio_buffers_.empty())\r
+ void push(const std::shared_ptr<core::audio_buffer>& audio_samples)\r
+ {\r
+ if(!audio_samples) \r
{\r
- auto audio_buffer = std::move(audio_buffers_.front());\r
- audio_buffers_.pop();\r
- boost::range::push_back(tickets, audio_buffer.second);\r
- return audio_buffer.first;\r
+ CASPAR_LOG(debug) << L"audio-chunk-count: " << audio_sample_count_/format_desc_.audio_samples_per_frame;\r
+ audio_streams_.push_back(core::audio_buffer());\r
+ audio_sample_count_ = 0;\r
+ return;\r
}\r
+\r
+ audio_sample_count_ += audio_samples->size();\r
+\r
+ boost::range::push_back(audio_streams_.back(), *audio_samples);\r
+\r
+ if(audio_streams_.back().size() > 8*format_desc_.audio_samples_per_frame)\r
+ BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("audio-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+ }\r
+\r
+ safe_ptr<basic_frame> pop()\r
+ { \r
+ auto frame = frame_buffer_.front();\r
+ frame_buffer_.pop_front(); \r
+ return frame;\r
+ }\r
+\r
+ size_t size() const\r
+ {\r
+ return frame_buffer_.size();\r
+ }\r
+\r
+ safe_ptr<core::write_frame> pop_video()\r
+ {\r
+ auto frame = video_streams_.front().front();\r
+ video_streams_.front().pop();\r
\r
- auto element = receive(audio_source_);\r
- auto audio = element.first;\r
+ return frame;\r
+ }\r
\r
- if(audio == loop_audio())\r
- {\r
- if(!audio_data_.empty())\r
- {\r
- CASPAR_LOG(info) << L"[frame_muxer] Truncating audio: " << audio_data_.size();\r
- audio_data_.clear();\r
- }\r
- return receive_audio(tickets);\r
- }\r
+ core::audio_buffer pop_audio()\r
+ {\r
+ auto begin = audio_streams_.front().begin();\r
+ auto end = begin + format_desc_.audio_samples_per_frame;\r
\r
- if(eof_ || audio == eof_audio())\r
- {\r
- eof_ = true;\r
- return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
- } \r
- \r
- audio_data_.insert(audio_data_.end(), audio->begin(), audio->end()); \r
- while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
+ auto samples = core::audio_buffer(begin, end);\r
+ audio_streams_.front().erase(begin, end);\r
+\r
+ return samples;\r
+ }\r
+ \r
+ bool video_ready() const\r
+ { \r
+ return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && video_ready2());\r
+ }\r
+ \r
+ bool audio_ready() const\r
+ {\r
+ return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && audio_ready2());\r
+ }\r
+\r
+ bool video_ready2() const\r
+ { \r
+ switch(display_mode_)\r
{\r
- auto begin = audio_data_.begin(); \r
- auto end = begin + format_desc_.audio_samples_per_frame;\r
- auto audio = make_safe<core::audio_buffer>(begin, end);\r
- audio_data_.erase(begin, end);\r
- audio_buffers_.push(frame_muxer2::audio_source_element_t(audio, element.second));\r
+ case display_mode::deinterlace_bob_reinterlace: \r
+ case display_mode::interlace: \r
+ return video_streams_.front().size() >= 2;\r
+ default: \r
+ return !video_streams_.front().empty();\r
}\r
-\r
- return receive_audio(tickets);\r
}\r
- \r
- virtual void run()\r
+ \r
+ bool audio_ready2() const\r
{\r
- try\r
+ switch(display_mode_)\r
{\r
- while(!eof_)\r
- {\r
- ticket_t tickets;\r
- \r
- auto video = receive_video(tickets);\r
- video->audio_data() = std::move(*receive_audio(tickets));\r
-\r
- if(eof_)\r
- break;\r
-\r
- switch(display_mode_.value())\r
- {\r
- case display_mode::simple: \r
- case display_mode::deinterlace:\r
- case display_mode::deinterlace_bob:\r
- {\r
- send(target_, frame_muxer2::target_element_t(std::move(video), std::move(tickets)));\r
-\r
- break;\r
- }\r
- case display_mode::duplicate: \r
- { \r
- auto video2 = make_safe<core::write_frame>(*video); \r
- video2->audio_data() = std::move(*receive_audio(tickets));\r
-\r
- send(target_, frame_muxer2::target_element_t(std::move(video), std::move(tickets))); \r
- send(target_, frame_muxer2::target_element_t(std::move(video2), std::move(tickets)));\r
-\r
- break;\r
- }\r
- case display_mode::half: \r
- { \r
- send(target_, frame_muxer2::target_element_t(std::move(video), std::move(tickets)));\r
- receive_video(tickets);\r
-\r
- break;\r
- }\r
- case display_mode::deinterlace_bob_reinterlace:\r
- case display_mode::interlace: \r
- { \r
- auto video2 = receive_video(tickets);\r
- \r
- auto frame = core::basic_frame::interlace(std::move(video), std::move(video2), format_desc_.field_mode); \r
- send(target_, frame_muxer2::target_element_t(std::move(frame), std::move(tickets)));\r
-\r
- break;\r
- }\r
- default: \r
- BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
- }\r
- } \r
+ case display_mode::duplicate: \r
+ return audio_streams_.front().size()/2 >= format_desc_.audio_samples_per_frame;\r
+ default: \r
+ return audio_streams_.front().size() >= format_desc_.audio_samples_per_frame;\r
}\r
- catch(...)\r
+ }\r
+ \r
+ void commit()\r
+ {\r
+ if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!video_ready2() || !audio_ready2()))\r
{\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
+ if(!video_streams_.front().empty() || !audio_streams_.front().empty())\r
+ CASPAR_LOG(debug) << "Truncating: " << video_streams_.front().size() << L" video-frames, " << audio_streams_.front().size() << L" audio-samples.";\r
+\r
+ video_streams_.pop_front();\r
+ audio_streams_.pop_front();\r
}\r
+\r
+ if(!video_ready2() || !audio_ready2())\r
+ return;\r
\r
- send(target_, frame_muxer2::target_element_t(core::basic_frame::eof(), ticket_t()));\r
+ switch(display_mode_)\r
+ {\r
+ case display_mode::simple:\r
+ case display_mode::deinterlace_bob:\r
+ case display_mode::deinterlace: \r
+ return simple(frame_buffer_);\r
+ case display_mode::duplicate: \r
+ return duplicate(frame_buffer_);\r
+ case display_mode::half: \r
+ return half(frame_buffer_);\r
+ case display_mode::interlace:\r
+ case display_mode::deinterlace_bob_reinterlace: \r
+ return interlace(frame_buffer_);\r
+ default: \r
+ BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
+ }\r
+ }\r
+ \r
+ void simple(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame1 = pop_video();\r
+ frame1->audio_data() = pop_audio();\r
+\r
+ dest.push_back(frame1); \r
+ }\r
+\r
+ void duplicate(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame = pop_video();\r
+\r
+ auto frame1 = make_safe<core::write_frame>(*frame); // make a copy\r
+ frame1->audio_data() = pop_audio();\r
+\r
+ auto frame2 = frame;\r
+ frame2->audio_data() = pop_audio();\r
+\r
+ dest.push_back(frame1);\r
+ dest.push_back(frame2);\r
+ }\r
+\r
+ void half(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame1 = pop_video();\r
+ frame1->audio_data() = pop_audio();\r
+ \r
+ video_streams_.front().pop(); // Throw away\r
\r
- done();\r
+ dest.push_back(frame1);\r
}\r
+ \r
+ void interlace(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame1 = pop_video();\r
+ frame1->audio_data() = pop_audio();\r
+ \r
+ auto frame2 = pop_video();\r
\r
+ dest.push_back(core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode)); \r
+ }\r
+ \r
void initialize_display_mode(AVFrame& frame)\r
{\r
auto display_mode = display_mode::invalid;\r
display_mode = display_mode::simple;\r
}\r
\r
- send(filter_, make_safe<filter>(filter_str_));\r
+ filter_ = filter(filter_str_);\r
\r
CASPAR_LOG(info) << "[frame_muxer] " << display_mode::print(display_mode);\r
\r
- send(display_mode_, display_mode);\r
+ display_mode_ = display_mode;\r
}\r
- \r
+ \r
+\r
int64_t calc_nb_frames(int64_t nb_frames) const\r
{\r
- switch(display_mode_.value()) // Take into account transformation in run.\r
+ switch(display_mode_) // Take into account transformation in run.\r
{\r
case display_mode::deinterlace_bob_reinterlace:\r
case display_mode::interlace: \r
break;\r
}\r
\r
- if(is_double_rate(widen(filter_.value()->filter_str()))) // Take into account transformations in filter.\r
+ if(is_double_rate(widen(filter_.filter_str()))) // Take into account transformations in filter.\r
nb_frames *= 2;\r
\r
return nb_frames;\r
}\r
};\r
\r
-frame_muxer2::frame_muxer2(video_source_t* video_source, \r
- audio_source_t* audio_source,\r
- target_t& target,\r
- double in_fps, \r
- const safe_ptr<core::frame_factory>& frame_factory,\r
- const std::wstring& filter)\r
- : impl_(new implementation(video_source, audio_source, target, in_fps, frame_factory, filter))\r
-{\r
-}\r
-\r
-int64_t frame_muxer2::calc_nb_frames(int64_t nb_frames) const\r
-{\r
- return impl_->calc_nb_frames(nb_frames);\r
-}\r
+frame_muxer::frame_muxer(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
+ : impl_(new implementation(in_fps, frame_factory, filter_str)){}\r
+void frame_muxer::push(const std::shared_ptr<AVFrame>& video_frame, int hints){impl_->push(video_frame, hints);}\r
+void frame_muxer::push(const std::shared_ptr<core::audio_buffer>& audio_samples){return impl_->push(audio_samples);}\r
+void frame_muxer::commit(){impl_->commit();}\r
+safe_ptr<basic_frame> frame_muxer::pop(){return impl_->pop();}\r
+size_t frame_muxer::size() const {return impl_->size();}\r
+bool frame_muxer::empty() const {return impl_->size() == 0;}\r
+bool frame_muxer::video_ready() const{return impl_->video_ready();}\r
+bool frame_muxer::audio_ready() const{return impl_->audio_ready();}\r
+int64_t frame_muxer::calc_nb_frames(int64_t nb_frames) const {return impl_->calc_nb_frames(nb_frames);}\r
\r
}}
\ No newline at end of file
#pragma once\r
\r
-#include "util.h"\r
-\r
#include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
\r
#include <core/mixer/audio/audio_mixer.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
-#include <agents.h>\r
-\r
#include <vector>\r
\r
struct AVFrame;\r
\r
namespace ffmpeg {\r
\r
-class frame_muxer2 : boost::noncopyable\r
+class frame_muxer : boost::noncopyable\r
{\r
public:\r
+ frame_muxer(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str);\r
\r
- typedef std::pair<safe_ptr<AVFrame>, ticket_t> video_source_element_t;\r
- typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t> audio_source_element_t;\r
- typedef std::pair<safe_ptr<core::basic_frame>, ticket_t> target_element_t;\r
-\r
- typedef Concurrency::ISource<video_source_element_t> video_source_t;\r
- typedef Concurrency::ISource<audio_source_element_t> audio_source_t;\r
- typedef Concurrency::ITarget<target_element_t> target_t;\r
- \r
- frame_muxer2(video_source_t* video_source,\r
- audio_source_t* audio_source, \r
- target_t& target,\r
- double in_fps, \r
- const safe_ptr<core::frame_factory>& frame_factory,\r
- const std::wstring& filter = L"");\r
+ void push(const std::shared_ptr<AVFrame>& video_frame, int hints = 0);\r
+ void push(const std::shared_ptr<core::audio_buffer>& audio_samples);\r
\r
+ void commit();\r
+\r
+ bool video_ready() const;\r
+ bool audio_ready() const;\r
+\r
+ size_t size() const;\r
+ bool empty() const;\r
+\r
int64_t calc_nb_frames(int64_t nb_frames) const;\r
+\r
+ safe_ptr<core::basic_frame> pop();\r
private:\r
struct implementation;\r
safe_ptr<implementation> impl_;\r
#include "../stdafx.h"\r
\r
#include "input.h"\r
+\r
#include "util.h"\r
#include "../ffmpeg_error.h"\r
\r
#include <common/exception/exceptions.h>\r
#include <common/exception/win32_exception.h>\r
\r
+#include <tbb/concurrent_queue.h>\r
#include <tbb/atomic.h>\r
\r
-#include <agents.h>\r
-#include <concrt_extras.h>\r
+#include <boost/range/algorithm.hpp>\r
+#include <boost/thread/condition_variable.hpp>\r
+#include <boost/thread/mutex.hpp>\r
+#include <boost/thread/thread.hpp>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
+#include <concrt_extras.h>\r
\r
-#undef Yield\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-static const size_t MAX_TOKENS = 32;\r
+static const size_t MAX_BUFFER_SIZE = 16 * 1000000;\r
+static const size_t MAX_BUFFER_COUNT = 100;\r
\r
-struct input::implementation : public Concurrency::agent, boost::noncopyable\r
-{\r
- input::target_t& target_;\r
-\r
- const std::wstring filename_;\r
- const safe_ptr<AVFormatContext> format_context_; // Destroy this last\r
- int default_stream_index_;\r
- const boost::iterator_range<AVStream**> streams_;\r
+struct input::implementation : public agent, public std::enable_shared_from_this<input::implementation>, boost::noncopyable\r
+{ \r
+ input::target_t& target_;\r
+ const safe_ptr<diagnostics::graph> graph_;\r
\r
- safe_ptr<diagnostics::graph> graph_;\r
+ const safe_ptr<AVFormatContext> format_context_; // Destroy this last\r
+ const int default_stream_index_;\r
\r
- tbb::atomic<bool> loop_;\r
- const size_t start_; \r
- const size_t length_;\r
- size_t frame_number_;\r
- \r
- tbb::atomic<size_t> nb_frames_;\r
- tbb::atomic<size_t> nb_loops_; \r
- tbb::atomic<size_t> packets_count_;\r
- tbb::atomic<size_t> packets_size_;\r
-\r
- overwrite_buffer<bool> is_running_;\r
- governor governor_;\r
+ const std::wstring filename_;\r
+ const bool loop_;\r
+ const size_t start_; \r
+ const size_t length_;\r
+ size_t frame_number_;\r
+ \r
+ tbb::atomic<size_t> buffer_size_;\r
+ tbb::atomic<size_t> buffer_count_;\r
+ Concurrency::event event_;\r
\r
+ tbb::atomic<bool> is_running_;\r
+\r
+ tbb::atomic<size_t> nb_frames_;\r
+ tbb::atomic<size_t> nb_loops_;\r
+\r
public:\r
- explicit implementation(input::target_t& target,\r
- const safe_ptr<diagnostics::graph>& graph, \r
- const std::wstring& filename, \r
- bool loop, \r
- size_t start,\r
- size_t length)\r
+ explicit implementation(input::target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
: target_(target)\r
- , filename_(filename)\r
- , format_context_(open_input(filename)) \r
- , default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
- , streams_(format_context_->streams, format_context_->streams + format_context_->nb_streams)\r
, graph_(graph)\r
+ , format_context_(open_input(filename))\r
+ , default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
+ , loop_(loop)\r
+ , filename_(filename)\r
, start_(start)\r
, length_(length)\r
, frame_number_(0)\r
- , governor_(8)\r
- { \r
- loop_ = loop;\r
- packets_count_ = 0;\r
- packets_size_ = 0;\r
- nb_frames_ = 0;\r
- nb_loops_ = 0;\r
- \r
- //av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
- \r
+ { \r
+ event_.set();\r
+ \r
if(start_ > 0) \r
seek_frame(start_);\r
\r
- graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+ graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); \r
+ graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
+\r
+ agent::start();\r
}\r
\r
~implementation()\r
{\r
- if(is_running_.value())\r
- stop();\r
+ is_running_ = false;\r
+ event_.set();\r
+ agent::wait(this);\r
}\r
- \r
+ \r
void stop()\r
{\r
- send(is_running_, false);\r
- governor_.cancel();\r
+ is_running_ = false;\r
+ event_.set();\r
agent::wait(this);\r
}\r
- \r
- virtual void run()\r
- {\r
- try\r
- {\r
- int last_stream_index = -1;\r
\r
- send(is_running_, true);\r
- for(auto packet = read_next_packet(); packet && is_running_.value(); packet = read_next_packet())\r
- { \r
- ticket_t ticket;\r
+ size_t nb_frames() const\r
+ {\r
+ return nb_frames_;\r
+ }\r
\r
- if((format_context_->nb_streams < 2 || last_stream_index != packet->stream_index) && packet->stream_index == default_stream_index_)\r
- ticket = governor_.acquire();\r
- last_stream_index = packet->stream_index;\r
+ size_t nb_loops() const\r
+ {\r
+ return nb_loops_;\r
+ }\r
\r
- Concurrency::asend(target_, input::target_element_t(packet, ticket));\r
- Context::Yield();\r
+private:\r
+ \r
+ virtual void run()\r
+ { \r
+ try\r
+ { \r
+ is_running_ = true;\r
+ while(is_running_)\r
+ {\r
+ read_next_packet(); \r
+ event_.wait(); \r
}\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- } \r
- \r
- BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::send(target_, input::target_element_t(eof_packet(stream->index), ticket_t())); \r
-\r
+ }\r
+ \r
+ is_running_ = false;\r
done();\r
}\r
-\r
- std::shared_ptr<AVPacket> read_next_packet()\r
- { \r
+ \r
+ void read_next_packet()\r
+ { \r
auto packet = create_packet();\r
- \r
+\r
int ret = [&]() -> int\r
{\r
- Concurrency::scoped_oversubcription_token oversubscribe;\r
+ scoped_oversubcription_token oversubscribe;\r
return av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
}();\r
\r
{\r
++nb_loops_;\r
frame_number_ = 0;\r
-\r
+ \r
if(loop_)\r
{\r
CASPAR_LOG(trace) << print() << " Looping.";\r
else\r
{\r
CASPAR_LOG(trace) << print() << " Stopping.";\r
- return nullptr;\r
+ is_running_ = false;\r
}\r
}\r
+ else\r
+ { \r
+ THROW_ON_ERROR(ret, "av_read_frame", print());\r
\r
- THROW_ON_ERROR(ret, "av_read_frame", print());\r
-\r
- if(packet->stream_index == default_stream_index_)\r
- {\r
- if(nb_loops_ == 0)\r
- ++nb_frames_;\r
- ++frame_number_;\r
- }\r
+ if(packet->stream_index == default_stream_index_)\r
+ {\r
+ if(nb_loops_ == 0)\r
+ ++nb_frames_;\r
+ ++frame_number_;\r
+ }\r
\r
- THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
+ THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
\r
- // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
- auto size = packet->size;\r
- auto data = packet->data; \r
+ // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+ auto size = packet->size;\r
+ auto data = packet->data;\r
\r
- packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
- {\r
- packet->size = size;\r
- packet->data = data;\r
- --packets_count_;\r
- });\r
-\r
- ++packets_count_;\r
- \r
- return packet;\r
- }\r
+ auto self = shared_from_this();\r
+ packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+ {\r
+ packet->size = size;\r
+ packet->data = data;\r
+\r
+ self->buffer_size_ -= packet->size;\r
+ --self->buffer_count_;\r
+ self->event_.set();\r
+ self->graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+ self->graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
+ });\r
+ \r
+ buffer_size_ += packet->size;\r
+ ++buffer_count_;\r
+ if((buffer_size_ > MAX_BUFFER_SIZE || buffer_count_ > MAX_BUFFER_COUNT) && is_running_)\r
+ event_.reset();\r
\r
+ send(target_, std::shared_ptr<AVPacket>(packet));\r
+ \r
+ graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+ graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
+ } \r
+ }\r
+ \r
void seek_frame(int64_t frame, int flags = 0)\r
{ \r
if(flags == AVSEEK_FLAG_BACKWARD)\r
}\r
\r
THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
- auto packet = create_packet();\r
- packet->size = 0;\r
-\r
- BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::asend(target_, input::target_element_t(loop_packet(stream->index), ticket_t())); \r
+ \r
+ Concurrency::asend(target_, std::shared_ptr<AVPacket>()); \r
\r
graph_->add_tag("seek"); \r
- } \r
+ } \r
\r
bool is_eof(int ret)\r
{\r
if(ret == AVERROR_EOF)\r
CASPAR_LOG(trace) << print() << " Received EOF. " << nb_frames_;\r
\r
- CASPAR_VERIFY(ret >= 0 || ret == AVERROR_EOF || ret == AVERROR(EIO), ffmpeg_error() << source_info(narrow(print())));\r
-\r
return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF;\r
}\r
\r
}\r
};\r
\r
-input::input(input::target_t& target,\r
- const safe_ptr<diagnostics::graph>& graph, \r
- const std::wstring& filename, \r
- bool loop, \r
- size_t start, \r
- size_t length)\r
- : impl_(new implementation(target, graph, filename, loop, start, length))\r
-{\r
-}\r
-\r
-safe_ptr<AVFormatContext> input::context()\r
-{\r
- return safe_ptr<AVFormatContext>(impl_->format_context_);\r
-}\r
-\r
-size_t input::nb_frames() const\r
-{\r
- return impl_->nb_frames_;\r
-}\r
-\r
-size_t input::nb_loops() const \r
-{\r
- return impl_->nb_loops_;\r
-}\r
-\r
-void input::start()\r
-{\r
- impl_->start();\r
-}\r
-\r
-void input::stop()\r
-{\r
- impl_->stop();\r
-}\r
-\r
-bool input::loop() const\r
-{\r
- return impl_->loop_;\r
-}\r
-void input::loop(bool value)\r
-{\r
- impl_->loop_ = value;\r
-}\r
-\r
+input::input(target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length)\r
+ : impl_(new implementation(target, graph, filename, loop, start, length)){}\r
+bool input::eof() const {return !impl_->is_running_;}\r
+safe_ptr<AVFormatContext> input::context(){return impl_->format_context_;}\r
+size_t input::nb_frames() const {return impl_->nb_frames();}\r
+size_t input::nb_loops() const {return impl_->nb_loops();}\r
+void input::stop(){impl_->stop();}\r
}}
\ No newline at end of file
*/\r
#pragma once\r
\r
-#include "util.h"\r
-\r
#include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
-\r
-#include <agents.h>\r
-#include <concrt.h>\r
\r
#include <memory>\r
#include <string>\r
\r
#include <boost/noncopyable.hpp>\r
-#include <boost/range/iterator_range.hpp>\r
+\r
+#include <agents.h>\r
\r
struct AVFormatContext;\r
struct AVPacket;\r
}\r
\r
namespace ffmpeg {\r
- \r
+\r
class input : boost::noncopyable\r
{\r
public:\r
- \r
- typedef std::pair<safe_ptr<AVPacket>, ticket_t> target_element_t;\r
+ typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
+\r
+ explicit input(target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start = 0, size_t length = std::numeric_limits<size_t>::max());\r
\r
- typedef Concurrency::ITarget<target_element_t> target_t;\r
+ bool eof() const;\r
+\r
+ void stop();\r
\r
- explicit input(target_t& target, \r
- const safe_ptr<diagnostics::graph>& graph, \r
- const std::wstring& filename, bool loop, \r
- size_t start = 0, \r
- size_t length = std::numeric_limits<size_t>::max());\r
- \r
size_t nb_frames() const;\r
size_t nb_loops() const;\r
- \r
- safe_ptr<AVFormatContext> context();\r
\r
- bool loop() const;\r
- void loop(bool value);\r
-\r
- void start();\r
- void stop();\r
+ safe_ptr<AVFormatContext> context();\r
private:\r
- friend struct implemenation;\r
struct implementation;\r
std::shared_ptr<implementation> impl_;\r
};\r
\r
#include "../../ffmpeg_error.h"\r
\r
-#include <core/producer/frame/basic_frame.h>\r
-#include <common/memory/memcpy.h>\r
+#include <core/producer/frame/frame_transform.h>\r
+#include <core/producer/frame/frame_factory.h>\r
+\r
+#include <boost/range/algorithm_ext/push_back.hpp>\r
+#include <boost/filesystem.hpp>\r
+\r
+#include <queue>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
-#include <tbb/scalable_allocator.h>\r
-\r
-#undef Yield\r
-using namespace Concurrency;\r
-\r
namespace caspar { namespace ffmpeg {\r
\r
-struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
-{ \r
+struct video_decoder::implementation : boost::noncopyable\r
+{\r
+ const safe_ptr<core::frame_factory> frame_factory_;\r
int index_;\r
- std::shared_ptr<AVCodecContext> codec_context_;\r
- \r
- double fps_;\r
- int64_t nb_frames_;\r
+ safe_ptr<AVCodecContext> codec_context_;\r
\r
- size_t width_;\r
- size_t height_;\r
- bool is_progressive_;\r
- \r
- unbounded_buffer<video_decoder::source_element_t> source_;\r
- ITarget<video_decoder::target_element_t>& target_;\r
+ std::queue<std::shared_ptr<AVPacket>> packets_;\r
\r
+ const double fps_;\r
+ const int64_t nb_frames_;\r
+ const size_t width_;\r
+ const size_t height_;\r
+\r
public:\r
- explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
- : codec_context_(open_codec(context, AVMEDIA_TYPE_VIDEO, index_))\r
+ explicit implementation(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) \r
+ : frame_factory_(frame_factory)\r
+ , codec_context_(open_codec(*context, AVMEDIA_TYPE_VIDEO, index_))\r
, fps_(static_cast<double>(codec_context_->time_base.den) / static_cast<double>(codec_context_->time_base.num))\r
- , nb_frames_(context.streams[index_]->nb_frames)\r
+ , nb_frames_(context->streams[index_]->nb_frames)\r
, width_(codec_context_->width)\r
, height_(codec_context_->height)\r
- , is_progressive_(true)\r
- , source_([this](const video_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
- , target_(target)\r
- { \r
- CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
- \r
- CASPAR_VERIFY(width_ > 0, ffmpeg_error());\r
- CASPAR_VERIFY(height_ > 0, ffmpeg_error());\r
+ { \r
+ }\r
\r
- source.link_target(&source_);\r
+ void push(const std::shared_ptr<AVPacket>& packet)\r
+ {\r
+ if(packet && packet->stream_index != index_)\r
+ return;\r
\r
- start();\r
+ packets_.push(packet);\r
}\r
\r
- ~implementation()\r
- {\r
- agent::wait(this);\r
+ std::vector<std::shared_ptr<AVFrame>> poll()\r
+ { \r
+ std::vector<std::shared_ptr<AVFrame>> result;\r
+\r
+ if(packets_.empty())\r
+ return result;\r
+ \r
+ auto packet = packets_.front();\r
+ \r
+ if(packet)\r
+ { \r
+ boost::range::push_back(result, decode(*packet));\r
+\r
+ if(packet->size == 0)\r
+ packets_.pop();\r
+ }\r
+ else\r
+ {\r
+ if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+ {\r
+ AVPacket pkt;\r
+ av_init_packet(&pkt);\r
+ pkt.data = nullptr;\r
+ pkt.size = 0;\r
+\r
+ boost::range::push_back(result, decode(pkt)); \r
+ }\r
+\r
+ if(result.empty())\r
+ { \r
+ packets_.pop();\r
+ avcodec_flush_buffers(codec_context_.get());\r
+ result.push_back(nullptr);\r
+ }\r
+ }\r
+ \r
+ return result;\r
}\r
\r
- std::shared_ptr<AVFrame> decode(AVPacket& packet)\r
+ std::vector<std::shared_ptr<AVFrame>> decode(AVPacket& pkt)\r
{\r
std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), av_free);\r
\r
int frame_finished = 0;\r
- THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, &packet), "[video_decocer]");\r
-\r
- // 1 packet <=> 1 frame.\r
+ THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, &pkt), "[video_decocer]");\r
+ \r
// If a decoder consumes less then the whole packet then something is wrong\r
// that might be just harmless padding at the end, or a problem with the\r
// AVParser or demuxer which puted more then one frame in a AVPacket.\r
+ pkt.data = nullptr;\r
+ pkt.size = 0;\r
\r
if(frame_finished == 0) \r
- return nullptr;\r
- \r
- if(decoded_frame->repeat_pict > 0)\r
- CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
+ return std::vector<std::shared_ptr<AVFrame>>();\r
\r
- return decoded_frame;\r
- }\r
-\r
- virtual void run()\r
- {\r
- try\r
- {\r
- while(true)\r
- {\r
- auto element = receive(source_);\r
- auto packet = element.first;\r
- \r
- if(packet == loop_packet(index_))\r
- { \r
- if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
- {\r
- AVPacket pkt;\r
- av_init_packet(&pkt);\r
- pkt.data = nullptr;\r
- pkt.size = 0;\r
-\r
- for(auto decoded_frame = decode(pkt); decoded_frame; decoded_frame = decode(pkt))\r
- {\r
- send(target_, target_element_t(dup_frame(make_safe_ptr(decoded_frame)), element.second));\r
- Context::Yield();\r
- }\r
- }\r
-\r
- avcodec_flush_buffers(codec_context_.get());\r
- send(target_, target_element_t(loop_video(), ticket_t()));\r
- continue;\r
- }\r
-\r
- if(packet == eof_packet(index_))\r
- break;\r
-\r
- auto decoded_frame = decode(*packet);\r
- if(!decoded_frame)\r
- continue;\r
- \r
- is_progressive_ = decoded_frame->interlaced_frame == 0;\r
- \r
- // C-TODO: Avoid duplication.\r
- // Need to dupliace frame data since avcodec_decode_video2 reuses it.\r
- send(target_, target_element_t(dup_frame(make_safe_ptr(decoded_frame)), element.second)); \r
- Context::Yield();\r
- }\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
+ if(decoded_frame->repeat_pict % 2 > 0)\r
+ CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
\r
- send(target_, target_element_t(eof_video(), ticket_t()));\r
-\r
- done();\r
+ return std::vector<std::shared_ptr<AVFrame>>(1 + decoded_frame->repeat_pict/2, decoded_frame);\r
}\r
-\r
- safe_ptr<AVFrame> dup_frame(const safe_ptr<AVFrame>& frame)\r
+ \r
+ bool ready() const\r
{\r
- auto desc = get_pixel_format_desc(static_cast<PixelFormat>(frame->format), frame->width, frame->height);\r
-\r
- auto count = desc.planes.size();\r
- std::array<uint8_t*, 4> org_ptrs;\r
- std::array<safe_ptr<uint8_t>, 4> new_ptrs;\r
- parallel_for<size_t>(0, count, [&](size_t n)\r
- {\r
- CASPAR_ASSERT(frame->data[n]);\r
- auto size = frame->linesize[n]*desc.planes[n].height;\r
- new_ptrs[n] = fast_memdup(frame->data[n], size);\r
- org_ptrs[n] = frame->data[n];\r
- frame->data[n] = new_ptrs[n].get();\r
- });\r
-\r
- return safe_ptr<AVFrame>(frame.get(), [frame, org_ptrs, new_ptrs, count](AVFrame*)\r
- {\r
- for(size_t n = 0; n < count; ++n)\r
- frame->data[n] = org_ptrs[n];\r
- });\r
+ return !packets_.empty();\r
}\r
- \r
+ \r
double fps() const\r
{\r
return fps_;\r
}\r
};\r
\r
-video_decoder::video_decoder(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
- : impl_(new implementation(source, target, context))\r
-{\r
-}\r
-\r
+video_decoder::video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) : impl_(new implementation(context, frame_factory)){}\r
+void video_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
+std::vector<std::shared_ptr<AVFrame>> video_decoder::poll(){return impl_->poll();}\r
+bool video_decoder::ready() const{return impl_->ready();}\r
double video_decoder::fps() const{return impl_->fps();}\r
int64_t video_decoder::nb_frames() const{return impl_->nb_frames_;}\r
size_t video_decoder::width() const{return impl_->width_;}\r
size_t video_decoder::height() const{return impl_->height_;}\r
-bool video_decoder::is_progressive() const{return impl_->is_progressive_;}\r
\r
}}
\ No newline at end of file
*/\r
#pragma once\r
\r
-#include "../util.h"\r
-\r
#include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
\r
#include <core/video_format.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
-#include <agents.h>\r
-\r
#include <vector>\r
\r
struct AVFormatContext;\r
\r
namespace core {\r
struct frame_factory;\r
+ class write_frame;\r
}\r
\r
namespace ffmpeg {\r
class video_decoder : boost::noncopyable\r
{\r
public:\r
+ explicit video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory);\r
\r
- typedef std::pair<safe_ptr<AVPacket>, ticket_t> source_element_t;\r
- typedef std::pair<safe_ptr<AVFrame>, ticket_t> target_element_t;\r
-\r
- typedef Concurrency::ISource<source_element_t> source_t;\r
- typedef Concurrency::ITarget<target_element_t> target_t;\r
+ void push(const std::shared_ptr<AVPacket>& packet);\r
+ bool ready() const;\r
+ std::vector<std::shared_ptr<AVFrame>> poll();\r
\r
- explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context); \r
-\r
size_t width() const;\r
size_t height() const;\r
\r
int64_t nb_frames() const;\r
- bool is_progressive() const;\r
\r
double fps() const;\r
private:\r
</decklink>\r
</consumers>\r
</channel>\r
- <channel>\r
- <video-mode>PAL</video-mode>\r
- <consumers>\r
- <bluefish>\r
- <device>1</device>\r
- <embedded-audio>true</embedded-audio>\r
- </bluefish>\r
- </consumers>\r
- </channel>\r
</channels>\r
<controllers>\r
<tcp>\r