]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Wed, 26 Oct 2011 19:24:29 +0000 (19:24 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Wed, 26 Oct 2011 19:24:29 +0000 (19:24 +0000)
18 files changed:
core/producer/frame_producer.cpp
core/video_channel.cpp
modules/decklink/decklink.cpp
modules/decklink/decklink.vcxproj
modules/decklink/interop/DeckLinkAPI_h.h
modules/decklink/interop/DeckLinkAPI_i.c
modules/decklink/producer/decklink_producer.cpp
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/audio/audio_decoder.h
modules/ffmpeg/producer/audio/audio_resampler.cpp
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/ffmpeg/producer/frame_muxer.cpp
modules/ffmpeg/producer/frame_muxer.h
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/input.h
modules/ffmpeg/producer/video/video_decoder.cpp
modules/ffmpeg/producer/video/video_decoder.h
shell/casparcg.config

index b525de62a8e5921fee841dcabdd40f8175c53b81..34da6ded148256a52f99c24d00cb35fbcd5c25c9 100644 (file)
@@ -48,12 +48,17 @@ struct destruction_context
 \r
 void __cdecl destroy_producer(LPVOID lpParam)\r
 {\r
+       static Concurrency::critical_section mutex;\r
        auto destruction = std::unique_ptr<destruction_context>(static_cast<destruction_context*>(lpParam));\r
        \r
        try\r
        {               \r
                if(destruction->producer.unique())\r
                {\r
+                       {\r
+                               Concurrency::critical_section::scoped_lock lock(mutex);\r
+                               Concurrency::wait(100);\r
+                       }\r
                        Concurrency::scoped_oversubcription_token oversubscribe;\r
                        CASPAR_LOG(info) << "Destroying: " << destruction->producer->print();\r
                        destruction->producer.reset();\r
index 60fa41eebf3743f9be42b9864a31f785ce6204bd..021cbcd61384297c1c56d800f875f309dd18aeba 100644 (file)
@@ -65,7 +65,7 @@ struct video_channel::implementation : boost::noncopyable
 public:\r
        implementation(int index, const video_format_desc& format_desc, ogl_device& ogl)  \r
                : format_desc_(format_desc)\r
-               , governor_(3)\r
+               , governor_(2)\r
                , output_(new caspar::core::output(mixer_frames_, format_desc))\r
                , mixer_(new caspar::core::mixer(stage_frames_, mixer_frames_, format_desc, ogl))\r
                , stage_(new caspar::core::stage(stage_frames_, governor_))     \r
index 94a8ba5561efc4509e51bae01f05296d535b1a01..dd76a653d057b17702f6dbff36070aabdf31b519 100644 (file)
@@ -23,7 +23,7 @@
 #include "util/util.h"\r
 \r
 #include "consumer/decklink_consumer.h"\r
-#include "producer/decklink_producer.h"\r
+//#include "producer/decklink_producer.h"\r
 \r
 #include <core/consumer/frame_consumer.h>\r
 #include <core/producer/frame_producer.h>\r
@@ -45,7 +45,7 @@ namespace caspar { namespace decklink {
 void init()\r
 {\r
        core::register_consumer_factory([](const std::vector<std::wstring>& params){return create_consumer(params);});\r
-       core::register_producer_factory(create_producer);\r
+       //core::register_producer_factory(create_producer);\r
 }\r
 \r
 std::wstring get_version() \r
index 754dd4ab9ca0556e2e15cc2092b0e57264efdd66..67e24980788d5d694b122738d5d9b495ac3a7e13 100644 (file)
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">true</ExcludedFromBuild>\r
+      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>\r
+      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">true</ExcludedFromBuild>\r
+      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>\r
     </ClCompile>\r
     <ClCompile Include="StdAfx.cpp">\r
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">Create</PrecompiledHeader>\r
index 6c6ea4f2d058b04c9be23ae68b644f49549d8bb0..7ed6b0ae601041f995bf301343615eb037ef2d2a 100644 (file)
@@ -4,7 +4,7 @@
 \r
 \r
  /* File created by MIDL compiler version 7.00.0555 */\r
-/* at Sun Oct 23 22:39:56 2011\r
+/* at Wed Oct 26 20:20:42 2011\r
  */\r
 /* Compiler settings for interop\DeckLinkAPI.idl:\r
     Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
index c0bbdf7694544834dc2b52fa5937c707fbd08197..84261f4cc12bde261edf920d2b97a585bb1464dd 100644 (file)
@@ -6,7 +6,7 @@
 \r
 \r
  /* File created by MIDL compiler version 7.00.0555 */\r
-/* at Sun Oct 23 22:39:56 2011\r
+/* at Wed Oct 26 20:20:42 2011\r
  */\r
 /* Compiler settings for interop\DeckLinkAPI.idl:\r
     Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
index 600a8b2d87481a45d53a0d0a6c603587b6cacb48..77ccd596f33458dec496245261964dec86be7ae2 100644 (file)
@@ -208,8 +208,7 @@ public:
 \r
                try\r
                {\r
-                       auto frame_element = Concurrency::receive(muxed_frames_);\r
-                       last_frame_ = frame = frame_element.first;\r
+                       last_frame_ = frame = Concurrency::receive(muxed_frames_, 10);\r
                }\r
                catch(Concurrency::operation_timed_out&)\r
                {               \r
@@ -278,7 +277,7 @@ public:
                                Concurrency::parallel_invoke(\r
                                [&]\r
                                {\r
-                                       Concurrency::send(video_frames_, ffmpeg::frame_muxer2::video_source_element_t(av_frame, ticket_t()));                                   \r
+                                       Concurrency::send(video_frames_, av_frame);                                     \r
                                },\r
                                [&]\r
                                {                                                                                                       \r
@@ -287,10 +286,10 @@ public:
                                        {\r
                                                auto sample_frame_count = audio->GetSampleFrameCount();\r
                                                auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
-                                               Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), ticket_t()));\r
+                                               Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
                                        }\r
                                        else\r
-                                               Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0), ticket_t()));    \r
+                                               Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0));      \r
                                });\r
                        }\r
 \r
index 7595c34dc2fda7a5f5000b197b82baadf6d916df..71faa00066cd4ec7ec48a22867945c47d4307686 100644 (file)
@@ -20,6 +20,7 @@
 #include "../../stdafx.h"\r
 \r
 #include "audio_decoder.h"\r
+\r
 #include "audio_resampler.h"\r
 \r
 #include "../util.h"\r
@@ -29,6 +30,8 @@
 \r
 #include <tbb/cache_aligned_allocator.h>\r
 \r
+#include <queue>\r
+\r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
 #pragma warning (disable : 4244)\r
@@ -42,108 +45,92 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#undef Yield\r
-using namespace Concurrency;\r
-\r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct audio_decoder::implementation : public agent, boost::noncopyable\r
+struct audio_decoder::implementation : boost::noncopyable\r
 {      \r
        int                                                                                                                     index_;\r
        std::shared_ptr<AVCodecContext>                                                         codec_context_;         \r
-       \r
+       const core::video_format_desc                                                           format_desc_;\r
        audio_resampler                                                                                         resampler_;\r
-       \r
+\r
        std::vector<int8_t,  tbb::cache_aligned_allocator<int8_t>>      buffer1_;\r
 \r
-       unbounded_buffer<audio_decoder::source_element_t>                       source_;\r
-       ITarget<audio_decoder::target_element_t>&                                       target_;\r
-       \r
+       std::queue<std::shared_ptr<AVPacket>>                                           packets_;\r
 public:\r
-       explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
-               : codec_context_(open_codec(context, AVMEDIA_TYPE_AUDIO, index_))\r
-               , resampler_(format_desc.audio_channels,        codec_context_->channels,\r
-                                        format_desc.audio_sample_rate, codec_context_->sample_rate,\r
-                                        AV_SAMPLE_FMT_S32,                             codec_context_->sample_fmt)\r
+       explicit implementation(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) \r
+               : codec_context_(open_codec(*context, AVMEDIA_TYPE_AUDIO, index_))\r
+               , format_desc_(format_desc)     \r
                , buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
-               , source_([this](const audio_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
-               , target_(target)\r
-       {                               \r
-               CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
-\r
-               source.link_target(&source_);\r
-\r
-               start();\r
+               , resampler_(format_desc_.audio_channels,    codec_context_->channels,\r
+                                                                                                format_desc_.audio_sample_rate, codec_context_->sample_rate,\r
+                                                                                                AV_SAMPLE_FMT_S32,                              codec_context_->sample_fmt)\r
+       {                          \r
        }\r
 \r
-       ~implementation()\r
-       {\r
-               agent::wait(this);\r
-       }\r
+       void push(const std::shared_ptr<AVPacket>& packet)\r
+       {                       \r
+               if(packet && packet->stream_index != index_)\r
+                       return;\r
 \r
-       virtual void run()\r
+               packets_.push(packet);\r
+       }       \r
+       \r
+       std::vector<std::shared_ptr<core::audio_buffer>> poll()\r
        {\r
-               try\r
+               std::vector<std::shared_ptr<core::audio_buffer>> result;\r
+\r
+               if(packets_.empty())\r
+                       return result;\r
+                               \r
+               auto packet = packets_.front();\r
+\r
+               if(packet)              \r
                {\r
-                       while(true)\r
-                       {                               \r
-                               auto element = receive(source_);\r
-                               auto packet = element.first;\r
-                       \r
-                               if(packet == loop_packet(index_))\r
-                               {\r
-                                       avcodec_flush_buffers(codec_context_.get());\r
-                                       send(target_, audio_decoder::target_element_t(loop_audio(), ticket_t()));\r
-                                       continue;\r
-                               }\r
-\r
-                               if(packet == eof_packet(index_))\r
-                                       break;\r
-\r
-                               auto result = std::make_shared<core::audio_buffer>();\r
-\r
-                               while(packet->size > 0)\r
-                               {\r
-                                       buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
-                                       int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
+                       result.push_back(decode(*packet));\r
+                       if(packet->size == 0)                                   \r
+                               packets_.pop();\r
+               }\r
+               else                    \r
+               {       \r
+                       avcodec_flush_buffers(codec_context_.get());\r
+                       result.push_back(nullptr);\r
+                       packets_.pop();\r
+               }               \r
+\r
+               return result;\r
+       }\r
+       \r
+       std::shared_ptr<core::audio_buffer> decode(AVPacket& pkt)\r
+       {               \r
+               buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
+               int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
                \r
-                                       int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
+               int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, &pkt), "[audio_decoder]");\r
 \r
-                                       // There might be several frames in one packet.\r
-                                       packet->size -= ret;\r
-                                       packet->data += ret;\r
+               // There might be several frames in one packet.\r
+               pkt.size -= ret;\r
+               pkt.data += ret;\r
                        \r
-                                       buffer1_.resize(written_bytes);\r
+               buffer1_.resize(written_bytes);\r
 \r
-                                       buffer1_ = resampler_.resample(std::move(buffer1_));\r
+               buffer1_ = resampler_.resample(std::move(buffer1_));\r
                \r
-                                       const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
-                                       const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
-\r
-                                       send(target_, audio_decoder::target_element_t(make_safe<core::audio_buffer>(samples, samples + n_samples), element.second));\r
-                                       Context::Yield();\r
-                               }\r
-                       }\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-               }\r
+               const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
+               const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
 \r
-               send(target_, audio_decoder::target_element_t(eof_audio(), ticket_t()));\r
+               return std::make_shared<core::audio_buffer>(samples, samples + n_samples);\r
+       }\r
 \r
-               done();\r
+       bool ready() const\r
+       {\r
+               return !packets_.empty();\r
        }\r
 };\r
 \r
-audio_decoder::audio_decoder(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc)\r
-       : impl_(new implementation(source, target, context, format_desc))\r
-{\r
-}\r
-\r
-int64_t audio_decoder::nb_frames() const\r
-{\r
-       return 0;\r
-}\r
+audio_decoder::audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) : impl_(new implementation(context, format_desc)){}\r
+void audio_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
+bool audio_decoder::ready() const{return impl_->ready();}\r
+std::vector<std::shared_ptr<core::audio_buffer>> audio_decoder::poll(){return impl_->poll();}\r
 \r
 }}
\ No newline at end of file
index 19da2325b1c74d4f53f7e2159f2718a2454b39ad..b786b188d4b08204c994818c3a14764e902faa79 100644 (file)
 */\r
 #pragma once\r
 \r
-#include "../util.h"\r
-\r
 #include <core/mixer/audio/audio_mixer.h>\r
 \r
 #include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
-#include <agents.h>\r
 #include <vector>\r
 \r
 struct AVPacket;\r
@@ -47,19 +43,13 @@ namespace ffmpeg {
 class audio_decoder : boost::noncopyable\r
 {\r
 public:\r
-\r
-       typedef std::pair<safe_ptr<AVPacket>, ticket_t>                         source_element_t;\r
-       typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t>       target_element_t;\r
-\r
-       typedef Concurrency::ISource<source_element_t>&                         source_t;\r
-       typedef Concurrency::ITarget<target_element_t>&                         target_t;\r
+       explicit audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
        \r
-       explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
+       void push(const std::shared_ptr<AVPacket>& packet);\r
+       bool ready() const;\r
+       std::vector<std::shared_ptr<core::audio_buffer>> poll();\r
        \r
-       int64_t nb_frames() const;\r
-\r
 private:\r
-       \r
        struct implementation;\r
        safe_ptr<implementation> impl_;\r
 };\r
index 2c46dd2918a9a33333d4a7b4fff0f7e7fdb1decc..179d09cc589e92408df493f59270451deb75104a 100644 (file)
@@ -22,6 +22,7 @@ struct audio_resampler::implementation
 {      \r
        std::shared_ptr<ReSampleContext> resampler_;\r
        \r
+       std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> copy_buffer_;\r
        std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer2_;\r
 \r
        const size_t                    output_channels_;\r
@@ -52,15 +53,16 @@ struct audio_resampler::implementation
                                                                        L" audio_channels:" << input_channels  <<\r
                                                                        L" sample_fmt:" << input_sample_format;\r
 \r
-                       CASPAR_VERIFY(resampler, caspar_exception());\r
-\r
-                       resampler_.reset(resampler, audio_resample_close);\r
+                       if(resampler)\r
+                               resampler_.reset(resampler, audio_resample_close);\r
+                       else\r
+                               BOOST_THROW_EXCEPTION(caspar_exception());\r
                }               \r
        }\r
 \r
        std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> resample(std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>>&& data)\r
        {\r
-               if(resampler_ && !data.empty())\r
+               if(resampler_)\r
                {\r
                        buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
                        auto ret = audio_resample(resampler_.get(),\r
index afb33bc95042182dd2a54a944b0678591b1adab9..ca99689505f4c9626d85a0747b3eb9727d867748 100644 (file)
@@ -26,7 +26,6 @@
 #include "util.h"\r
 #include "audio/audio_decoder.h"\r
 #include "video/video_decoder.h"\r
-#include "../ffmpeg_error.h"\r
 \r
 #include <common/env.h>\r
 #include <common/utility/assert.h>\r
 #include <boost/range/algorithm/find_if.hpp>\r
 #include <boost/range/algorithm/find.hpp>\r
 \r
-#include <agents.h>\r
+#include <ppl.h>\r
 \r
-#include <iterator>\r
-#include <vector>\r
-#include <string>\r
+#include <agents.h>\r
 \r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
-               \r
+                               \r
 struct ffmpeg_producer : public core::frame_producer\r
-{      \r
-       const std::wstring                                                                                                              filename_;\r
-       const int                                                                                                                               start_;\r
-       const bool                                                                                                                              loop_;\r
-       const size_t                                                                                                                    length_;\r
+{\r
+       const safe_ptr<diagnostics::graph>                              graph_;\r
+\r
+       const std::wstring                                                              filename_;\r
        \r
-       call<input::target_element_t>                                                                                   throw_away_;\r
-       unbounded_buffer<input::target_element_t>                                                               packets_;\r
-       std::shared_ptr<unbounded_buffer<frame_muxer2::video_source_element_t>> video_;\r
-       std::shared_ptr<unbounded_buffer<frame_muxer2::audio_source_element_t>> audio_;\r
-       unbounded_buffer<frame_muxer2::target_element_t>                                                frames_;\r
-               \r
-       const safe_ptr<diagnostics::graph>                                                                              graph_;\r
+       boost::timer                                                                    frame_timer_;\r
+       boost::timer                                                                    video_timer_;\r
+       boost::timer                                                                    audio_timer_;\r
                                        \r
-       input                                                                                                                                   input_; \r
-       std::unique_ptr<video_decoder>                                                                                  video_decoder_;\r
-       std::unique_ptr<audio_decoder>                                                                                  audio_decoder_; \r
-       std::unique_ptr<frame_muxer2>                                                                                   muxer_;\r
+       const safe_ptr<core::frame_factory>                             frame_factory_;\r
+       const core::video_format_desc                                   format_desc_;\r
+       \r
+       unbounded_buffer<std::shared_ptr<AVPacket>>             packets_;\r
+       input                                                                                   input_; \r
+       video_decoder                                                                   video_decoder_;\r
+       audio_decoder                                                                   audio_decoder_; \r
+       double                                                                                  fps_;\r
+       frame_muxer                                                                             muxer_;\r
+\r
+       const int                                                                               start_;\r
+       const bool                                                                              loop_;\r
+       const size_t                                                                    length_;\r
+\r
+       safe_ptr<core::basic_frame>                                             last_frame_;\r
+\r
+       const size_t                                                                    width_;\r
+       const size_t                                                                    height_;\r
+       bool                                                                                    is_progressive_;\r
 \r
-       safe_ptr<core::basic_frame>                                                                                             last_frame_;\r
        \r
 public:\r
        explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
                : filename_(filename)\r
+               , frame_factory_(frame_factory)         \r
+               , format_desc_(frame_factory->get_video_format_desc())\r
+               , input_(packets_, graph_, filename_, loop, start, length)\r
+               , video_decoder_(input_.context(), frame_factory)\r
+               , audio_decoder_(input_.context(), frame_factory->get_video_format_desc())\r
+               , fps_(video_decoder_.fps())\r
+               , muxer_(fps_, frame_factory, filter)\r
                , start_(start)\r
                , loop_(loop)\r
                , length_(length)\r
-               , throw_away_([](const input::target_element_t&){})\r
-               , input_(packets_, graph_, filename_, loop, start, length)\r
                , last_frame_(core::basic_frame::empty())\r
-       {               \r
-               try\r
-               {\r
-                       auto video = std::make_shared<unbounded_buffer<frame_muxer2::video_source_element_t>>();\r
-                       video_decoder_.reset(new video_decoder(packets_, *video, *input_.context()));\r
-                       video_ = video;\r
-               }\r
-               catch(averror_stream_not_found&)\r
-               {\r
-                       CASPAR_LOG(warning) << "No video-stream found. Running without video."; \r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       CASPAR_LOG(warning) << "Failed to open video-stream. Running without video.";   \r
-               }\r
-\r
-               try\r
-               {\r
-                       auto audio = std::make_shared<unbounded_buffer<frame_muxer2::audio_source_element_t>>();\r
-                       audio_decoder_.reset(new audio_decoder(packets_, *audio, *input_.context(), frame_factory->get_video_format_desc()));\r
-                       audio_ = audio;\r
-               }\r
-               catch(averror_stream_not_found&)\r
-               {\r
-                       CASPAR_LOG(warning) << "No audio-stream found. Running without video."; \r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       CASPAR_LOG(warning) << "Failed to open audio-stream. Running without audio.";           \r
-               }               \r
-\r
-               CASPAR_VERIFY(video_decoder_ || audio_decoder_, ffmpeg_error());\r
-               \r
-               packets_.link_target(&throw_away_);\r
-               muxer_.reset(new frame_muxer2(video_.get(), audio_.get(), frames_, video_decoder_ ? video_decoder_->fps() : frame_factory->get_video_format_desc().fps, frame_factory));\r
-                               \r
+               , width_(video_decoder_.width())\r
+               , height_(video_decoder_.height())\r
+               , is_progressive_(true)\r
+       {\r
+               graph_->add_guide("frame-time", 0.5);\r
+               graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));\r
                graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f));   \r
-               graph_->set_text(print());\r
                diagnostics::register_graph(graph_);\r
-\r
-               input_.start();\r
        }\r
 \r
        ~ffmpeg_producer()\r
        {\r
-               input_.stop();  \r
+               input_.stop();\r
        }\r
-                                               \r
+                       \r
        virtual safe_ptr<core::basic_frame> receive(int hints)\r
        {\r
                auto frame = core::basic_frame::late();\r
                \r
-               try\r
-               {               \r
-                       auto frame_element = Concurrency::receive(frames_, 10);\r
-                       frame = last_frame_ = frame_element.first;\r
-                       graph_->set_text(narrow(print()));\r
-               }\r
-               catch(operation_timed_out&)\r
-               {               \r
-                       graph_->add_tag("underflow");   \r
+               frame_timer_.restart();\r
+               \r
+               for(int n = 0; n < 64 && muxer_.empty(); ++n)\r
+                       decode_frame(hints);\r
+               \r
+               graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
+\r
+               if(!muxer_.empty())\r
+                       frame = last_frame_ = muxer_.pop();     \r
+               else\r
+               {\r
+                       if(input_.eof())\r
+                               return core::basic_frame::eof();\r
+                       else                    \r
+                               graph_->add_tag("underflow");   \r
                }\r
 \r
+               graph_->set_text(narrow(print()));\r
+               \r
                return frame;\r
        }\r
 \r
@@ -158,8 +142,51 @@ public:
        {\r
                return disable_audio(last_frame_);\r
        }\r
-       \r
-       virtual int64_t nb_frames() const\r
+\r
+       void push_packets()\r
+       {\r
+               for(int n = 0; n < 16 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) \r
+               {\r
+                       std::shared_ptr<AVPacket> pkt;\r
+                       if(try_receive(packets_, pkt))\r
+                       {\r
+                               video_decoder_.push(pkt);\r
+                               audio_decoder_.push(pkt);\r
+                       }\r
+               }\r
+       }\r
+\r
+       void decode_frame(int hints)\r
+       {\r
+               push_packets();\r
+               \r
+               parallel_invoke(\r
+               [&]\r
+               {\r
+                       if(muxer_.video_ready())\r
+                               return;\r
+\r
+                       auto video_frames = video_decoder_.poll();\r
+                       BOOST_FOREACH(auto& video, video_frames)        \r
+                       {\r
+                               is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_;\r
+                               muxer_.push(video, hints);      \r
+                       }\r
+               },\r
+               [&]\r
+               {\r
+                       if(muxer_.audio_ready())\r
+                               return;\r
+                                       \r
+                       auto audio_samples = audio_decoder_.poll();\r
+                       BOOST_FOREACH(auto& audio, audio_samples)\r
+                               muxer_.push(audio);                             \r
+               });\r
+\r
+               muxer_.commit();\r
+       }\r
+\r
+       virtual int64_t nb_frames() const \r
        {\r
                if(loop_)\r
                        return std::numeric_limits<int64_t>::max();\r
@@ -169,39 +196,24 @@ public:
                int64_t nb_frames = input_.nb_frames();\r
                if(input_.nb_loops() < 1) // input still hasn't counted all frames\r
                {\r
-                       int64_t video_nb_frames = video_decoder_->nb_frames();\r
-                       int64_t audio_nb_frames = audio_decoder_->nb_frames();\r
+                       int64_t video_nb_frames = video_decoder_.nb_frames();\r
 \r
-                       nb_frames = std::min(static_cast<int64_t>(length_), std::max(nb_frames, std::max(video_nb_frames, audio_nb_frames)));\r
+                       nb_frames = std::min(static_cast<int64_t>(length_), std::max(nb_frames, video_nb_frames));\r
                }\r
 \r
-               nb_frames = muxer_->calc_nb_frames(nb_frames);\r
-               \r
-               return nb_frames - start_;\r
-       }\r
+               nb_frames = muxer_.calc_nb_frames(nb_frames);\r
 \r
-       virtual void param(const std::wstring& param)\r
-       {\r
-               typedef std::istream_iterator<std::wstring, wchar_t, std::char_traits<wchar_t>> wistream_iterator;\r
-               std::wstringstream str(param);\r
-               std::vector<std::wstring> params;\r
-               std::copy(wistream_iterator(str), wistream_iterator(), std::back_inserter(params));\r
+               // TODO: Might need to scale nb_frames av frame_muxer transformations.\r
 \r
-               if(boost::iequals(params.at(0), L"LOOP"))\r
-                       input_.loop(boost::lexical_cast<bool>(params.at(1)));\r
+               return nb_frames - start_;\r
        }\r
                                \r
        virtual std::wstring print() const\r
        {\r
-               if(video_decoder_)\r
-               {\r
-                       return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
-                                                         + boost::lexical_cast<std::wstring>(video_decoder_->width()) + L"x" + boost::lexical_cast<std::wstring>(video_decoder_->height())\r
-                                                         + (video_decoder_->is_progressive() ? L"p" : L"i")  + boost::lexical_cast<std::wstring>(video_decoder_->is_progressive() ? video_decoder_->fps() : 2.0 * video_decoder_->fps())\r
-                                                         + L"]";\r
-               }\r
-               \r
-               return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"]";\r
+               return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
+                                                 + boost::lexical_cast<std::wstring>(width_) + L"x" + boost::lexical_cast<std::wstring>(height_)\r
+                                                 + (is_progressive_ ? L"p" : L"i")  + boost::lexical_cast<std::wstring>(is_progressive_ ? fps_ : 2.0 * fps_)\r
+                                                 + L"]";\r
        }\r
 };\r
 \r
index 9a5444ed9cb02ee1c2ff34a5a56e047b400c7d7f..420b12036178a24659c508969c69f22dae6067bc 100644 (file)
@@ -9,6 +9,8 @@
 \r
 #include <core/producer/frame_producer.h>\r
 #include <core/producer/frame/basic_frame.h>\r
+#include <core/producer/frame/frame_transform.h>\r
+#include <core/producer/frame/pixel_format.h>\r
 #include <core/producer/frame/frame_factory.h>\r
 #include <core/mixer/write_frame.h>\r
 \r
@@ -16,8 +18,6 @@
 #include <common/exception/exceptions.h>\r
 #include <common/log/log.h>\r
 \r
-#include <boost/range/algorithm_ext/push_back.hpp>\r
-\r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
 #pragma warning (disable : 4244)\r
@@ -33,206 +33,248 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <agents.h>\r
+#include <boost/foreach.hpp>\r
+#include <boost/range/algorithm_ext/push_back.hpp>\r
+\r
+#include <deque>\r
+#include <queue>\r
+#include <vector>\r
 \r
-using namespace Concurrency;\r
+using namespace caspar::core;\r
 \r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
-{              \r
-       typedef std::pair<safe_ptr<AVFrame>, ticket_t>                          video_element_t;\r
-       typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t>       audio_element_t;\r
-\r
-       frame_muxer2::video_source_t* video_source_;\r
-       frame_muxer2::audio_source_t* audio_source_;\r
-\r
-       ITarget<frame_muxer2::target_element_t>&                target_;\r
-       mutable single_assignment<display_mode::type>   display_mode_;\r
+struct frame_muxer::implementation : boost::noncopyable\r
+{      \r
+       std::deque<std::queue<safe_ptr<write_frame>>>   video_streams_;\r
+       std::deque<core::audio_buffer>                                  audio_streams_;\r
+       std::deque<safe_ptr<basic_frame>>                               frame_buffer_;\r
+       display_mode::type                                                              display_mode_;\r
        const double                                                                    in_fps_;\r
-       const core::video_format_desc                                   format_desc_;\r
-       const bool                                                                              auto_transcode_;\r
-       \r
-       mutable single_assignment<safe_ptr<filter>>             filter_;\r
-       const safe_ptr<core::frame_factory>                             frame_factory_;\r
-                       \r
-       core::audio_buffer                                                              audio_data_;\r
-       \r
-       std::queue<video_element_t>                                             video_frames_;\r
-       std::queue<audio_element_t>                                             audio_buffers_;\r
+       const video_format_desc                                                 format_desc_;\r
+       bool                                                                                    auto_transcode_;\r
 \r
+       size_t                                                                                  audio_sample_count_;\r
+       size_t                                                                                  video_frame_count_;\r
+               \r
+       size_t                                                                                  processed_audio_sample_count_;\r
+       size_t                                                                                  processed_video_frame_count_;\r
+\r
+       filter                                                                                  filter_;\r
+       safe_ptr<core::frame_factory>                                   frame_factory_;\r
        std::wstring                                                                    filter_str_;\r
-       bool                                                                                    eof_;\r
-       \r
-       implementation(frame_muxer2::video_source_t* video_source,\r
-                                  frame_muxer2::audio_source_t* audio_source,\r
-                                  frame_muxer2::target_t& target,\r
-                                  double in_fps, \r
-                                  const safe_ptr<core::frame_factory>& frame_factory,\r
-                                  const std::wstring& filter)\r
-               : video_source_(video_source)\r
-               , audio_source_(audio_source)\r
-               , target_(target)\r
+               \r
+       implementation(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
+               : video_streams_(1)\r
+               , audio_streams_(1)\r
+               , display_mode_(display_mode::invalid)\r
                , in_fps_(in_fps)\r
                , format_desc_(frame_factory->get_video_format_desc())\r
                , auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
+               , audio_sample_count_(0)\r
+               , video_frame_count_(0)\r
                , frame_factory_(frame_factory)\r
-               , eof_(false)\r
-       {               \r
-               start();\r
-       }\r
-\r
-       ~implementation()\r
+               , filter_str_(filter_str)\r
        {\r
-               agent::wait(this);\r
        }\r
-                               \r
-       safe_ptr<core::write_frame> receive_video(ticket_t& tickets)\r
-       {       \r
-               if(!video_source_)\r
-                       return make_safe<core::write_frame>(this);      \r
 \r
-               if(!video_frames_.empty())\r
+       void push(const std::shared_ptr<AVFrame>& video_frame, int hints)\r
+       {               \r
+               if(!video_frame)\r
+               {       \r
+                       CASPAR_LOG(debug) << L"video-frame-count: " << static_cast<float>(video_frame_count_);\r
+                       video_frame_count_ = 0;\r
+                       video_streams_.push_back(std::queue<safe_ptr<write_frame>>());\r
+                       return;\r
+               }\r
+\r
+               if(video_frame->data[0] == nullptr)\r
                {\r
-                       auto video_frame = std::move(video_frames_.front());\r
-                       video_frames_.pop();\r
-                       boost::range::push_back(tickets, video_frame.second);\r
-                       return make_write_frame(this, video_frame.first, frame_factory_, 0);\r
+                       video_streams_.back().push(make_safe<core::write_frame>(this));\r
+                       ++video_frame_count_;\r
+                       display_mode_ = display_mode::simple;\r
+                       return;\r
                }\r
 \r
-               auto element = receive(video_source_);\r
-               auto video       = element.first;\r
+               if(display_mode_ == display_mode::invalid)\r
+                       initialize_display_mode(*video_frame);\r
+                                               \r
+               if(hints & core::frame_producer::ALPHA_HINT)\r
+                       video_frame->format = make_alpha_format(video_frame->format);\r
                \r
-               if(video == loop_video())\r
-                       return receive_video(tickets);\r
+               auto format = video_frame->format;\r
+               if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
+                       video_frame->format = PIX_FMT_GRAY8;\r
 \r
-               if(eof_ || video == eof_video())\r
+               filter_.push(video_frame);\r
+               BOOST_FOREACH(auto& av_frame, filter_.poll_all())\r
                {\r
-                       eof_ = true;\r
-                       return make_safe<core::write_frame>(this);              \r
-               }       \r
+                       av_frame->format = format;\r
+                                               \r
+                       video_streams_.back().push(make_write_frame(this, av_frame, frame_factory_, hints));\r
+                       ++video_frame_count_;\r
+               }\r
 \r
-               if(!display_mode_.has_value())\r
-                       initialize_display_mode(*video);\r
-                       \r
-               filter_.value()->push(video);\r
-               for(auto frame = filter_.value()->poll(); frame; frame = filter_.value()->poll())                       \r
-                       video_frames_.push(video_element_t(make_safe_ptr(frame), element.second));              \r
-               \r
-               return receive_video(tickets);\r
+               if(video_streams_.back().size() > 8)\r
+                       BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("video-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
        }\r
-       \r
-       std::shared_ptr<core::audio_buffer> receive_audio(ticket_t& tickets)\r
-       {               \r
-               if(!audio_source_)\r
-                       return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
 \r
-               if(!audio_buffers_.empty())\r
+       void push(const std::shared_ptr<core::audio_buffer>& audio_samples)\r
+       {\r
+               if(!audio_samples)      \r
                {\r
-                       auto audio_buffer = std::move(audio_buffers_.front());\r
-                       audio_buffers_.pop();\r
-                       boost::range::push_back(tickets, audio_buffer.second);\r
-                       return audio_buffer.first;\r
+                       CASPAR_LOG(debug) << L"audio-chunk-count: " << audio_sample_count_/format_desc_.audio_samples_per_frame;\r
+                       audio_streams_.push_back(core::audio_buffer());\r
+                       audio_sample_count_ = 0;\r
+                       return;\r
                }\r
+\r
+               audio_sample_count_ += audio_samples->size();\r
+\r
+               boost::range::push_back(audio_streams_.back(), *audio_samples);\r
+\r
+               if(audio_streams_.back().size() > 8*format_desc_.audio_samples_per_frame)\r
+                       BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("audio-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+       }\r
+\r
+       safe_ptr<basic_frame> pop()\r
+       {               \r
+               auto frame = frame_buffer_.front();\r
+               frame_buffer_.pop_front();              \r
+               return frame;\r
+       }\r
+\r
+       size_t size() const\r
+       {\r
+               return frame_buffer_.size();\r
+       }\r
+\r
+       safe_ptr<core::write_frame> pop_video()\r
+       {\r
+               auto frame = video_streams_.front().front();\r
+               video_streams_.front().pop();\r
                \r
-               auto element = receive(audio_source_);\r
-               auto audio       = element.first;\r
+               return frame;\r
+       }\r
 \r
-               if(audio == loop_audio())\r
-               {\r
-                       if(!audio_data_.empty())\r
-                       {\r
-                               CASPAR_LOG(info) << L"[frame_muxer] Truncating audio: " << audio_data_.size();\r
-                               audio_data_.clear();\r
-                       }\r
-                       return receive_audio(tickets);\r
-               }\r
+       core::audio_buffer pop_audio()\r
+       {\r
+               auto begin = audio_streams_.front().begin();\r
+               auto end   = begin + format_desc_.audio_samples_per_frame;\r
 \r
-               if(eof_ || audio == eof_audio())\r
-               {\r
-                       eof_ = true;\r
-                       return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
-               }               \r
-                       \r
-               audio_data_.insert(audio_data_.end(), audio->begin(), audio->end());            \r
-               while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
+               auto samples = core::audio_buffer(begin, end);\r
+               audio_streams_.front().erase(begin, end);\r
+\r
+               return samples;\r
+       }\r
+       \r
+       bool video_ready() const\r
+       {               \r
+               return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && video_ready2());\r
+       }\r
+       \r
+       bool audio_ready() const\r
+       {\r
+               return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && audio_ready2());\r
+       }\r
+\r
+       bool video_ready2() const\r
+       {               \r
+               switch(display_mode_)\r
                {\r
-                       auto begin = audio_data_.begin(); \r
-                       auto end   = begin + format_desc_.audio_samples_per_frame;\r
-                       auto audio = make_safe<core::audio_buffer>(begin, end);\r
-                       audio_data_.erase(begin, end);\r
-                       audio_buffers_.push(frame_muxer2::audio_source_element_t(audio, element.second));\r
+               case display_mode::deinterlace_bob_reinterlace:                                 \r
+               case display_mode::interlace:                                   \r
+                       return video_streams_.front().size() >= 2;\r
+               default:                                                                                \r
+                       return !video_streams_.front().empty();\r
                }\r
-\r
-               return receive_audio(tickets);\r
        }\r
-                       \r
-       virtual void run()\r
+       \r
+       bool audio_ready2() const\r
        {\r
-               try\r
+               switch(display_mode_)\r
                {\r
-                       while(!eof_)\r
-                       {\r
-                               ticket_t tickets;\r
-                               \r
-                               auto video = receive_video(tickets);\r
-                               video->audio_data() = std::move(*receive_audio(tickets));\r
-\r
-                               if(eof_)\r
-                                       break;\r
-\r
-                               switch(display_mode_.value())\r
-                               {\r
-                               case display_mode::simple:                      \r
-                               case display_mode::deinterlace:\r
-                               case display_mode::deinterlace_bob:\r
-                                       {\r
-                                               send(target_, frame_muxer2::target_element_t(std::move(video), std::move(tickets)));\r
-\r
-                                               break;\r
-                                       }\r
-                               case display_mode::duplicate:                                   \r
-                                       {                                               \r
-                                               auto video2 = make_safe<core::write_frame>(*video);     \r
-                                               video2->audio_data() = std::move(*receive_audio(tickets));\r
-\r
-                                               send(target_, frame_muxer2::target_element_t(std::move(video), std::move(tickets)));                                            \r
-                                               send(target_, frame_muxer2::target_element_t(std::move(video2), std::move(tickets)));\r
-\r
-                                               break;\r
-                                       }\r
-                               case display_mode::half:                                                \r
-                                       {                                                               \r
-                                               send(target_, frame_muxer2::target_element_t(std::move(video), std::move(tickets)));\r
-                                               receive_video(tickets);\r
-\r
-                                               break;\r
-                                       }\r
-                               case display_mode::deinterlace_bob_reinterlace:\r
-                               case display_mode::interlace:                                   \r
-                                       {                                       \r
-                                               auto video2 = receive_video(tickets);\r
-                                                                                               \r
-                                               auto frame = core::basic_frame::interlace(std::move(video), std::move(video2), format_desc_.field_mode);        \r
-                                               send(target_, frame_muxer2::target_element_t(std::move(frame), std::move(tickets)));\r
-\r
-                                               break;\r
-                                       }\r
-                               default:        \r
-                                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
-                               }\r
-                       }       \r
+               case display_mode::duplicate:                                   \r
+                       return audio_streams_.front().size()/2 >= format_desc_.audio_samples_per_frame;\r
+               default:                                                                                \r
+                       return audio_streams_.front().size() >= format_desc_.audio_samples_per_frame;\r
                }\r
-               catch(...)\r
+       }\r
+               \r
+       void commit()\r
+       {\r
+               if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!video_ready2() || !audio_ready2()))\r
                {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+                       if(!video_streams_.front().empty() || !audio_streams_.front().empty())\r
+                               CASPAR_LOG(debug) << "Truncating: " << video_streams_.front().size() << L" video-frames, " << audio_streams_.front().size() << L" audio-samples.";\r
+\r
+                       video_streams_.pop_front();\r
+                       audio_streams_.pop_front();\r
                }\r
+\r
+               if(!video_ready2() || !audio_ready2())\r
+                       return;\r
                \r
-               send(target_, frame_muxer2::target_element_t(core::basic_frame::eof(), ticket_t()));\r
+               switch(display_mode_)\r
+               {\r
+               case display_mode::simple:\r
+               case display_mode::deinterlace_bob:\r
+               case display_mode::deinterlace:                                 \r
+                       return simple(frame_buffer_);\r
+               case display_mode::duplicate:                                   \r
+                       return duplicate(frame_buffer_);\r
+               case display_mode::half:                                                \r
+                       return half(frame_buffer_);\r
+               case display_mode::interlace:\r
+               case display_mode::deinterlace_bob_reinterlace: \r
+                       return interlace(frame_buffer_);\r
+               default:                                                                                \r
+                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
+               }\r
+       }\r
+       \r
+       void simple(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {               \r
+               auto frame1 = pop_video();\r
+               frame1->audio_data() = pop_audio();\r
+\r
+               dest.push_back(frame1);         \r
+       }\r
+\r
+       void duplicate(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {               \r
+               auto frame = pop_video();\r
+\r
+               auto frame1 = make_safe<core::write_frame>(*frame); // make a copy\r
+               frame1->audio_data() = pop_audio();\r
+\r
+               auto frame2 = frame;\r
+               frame2->audio_data() = pop_audio();\r
+\r
+               dest.push_back(frame1);\r
+               dest.push_back(frame2);\r
+       }\r
+\r
+       void half(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {                                                       \r
+               auto frame1 = pop_video();\r
+               frame1->audio_data() = pop_audio();\r
+                               \r
+               video_streams_.front().pop(); // Throw away\r
 \r
-               done();\r
+               dest.push_back(frame1);\r
        }\r
+       \r
+       void interlace(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {                               \r
+               auto frame1 = pop_video();\r
+               frame1->audio_data() = pop_audio();\r
+                               \r
+               auto frame2 = pop_video();\r
 \r
+               dest.push_back(core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode));          \r
+       }\r
+               \r
        void initialize_display_mode(AVFrame& frame)\r
        {\r
                auto display_mode = display_mode::invalid;\r
@@ -267,16 +309,17 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                        display_mode = display_mode::simple;\r
                }\r
                        \r
-               send(filter_, make_safe<filter>(filter_str_));\r
+               filter_ = filter(filter_str_);\r
 \r
                CASPAR_LOG(info) << "[frame_muxer] " << display_mode::print(display_mode);\r
 \r
-               send(display_mode_, display_mode);\r
+               display_mode_ = display_mode;\r
        }\r
-                                       \r
+               \r
+\r
        int64_t calc_nb_frames(int64_t nb_frames) const\r
        {\r
-               switch(display_mode_.value()) // Take into account transformation in run.\r
+               switch(display_mode_) // Take into account transformation in run.\r
                {\r
                case display_mode::deinterlace_bob_reinterlace:\r
                case display_mode::interlace:   \r
@@ -288,26 +331,23 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                        break;\r
                }\r
 \r
-               if(is_double_rate(widen(filter_.value()->filter_str()))) // Take into account transformations in filter.\r
+               if(is_double_rate(widen(filter_.filter_str()))) // Take into account transformations in filter.\r
                        nb_frames *= 2;\r
 \r
                return nb_frames;\r
        }\r
 };\r
 \r
-frame_muxer2::frame_muxer2(video_source_t* video_source, \r
-                                                  audio_source_t* audio_source,\r
-                                                  target_t& target,\r
-                                                  double in_fps, \r
-                                                  const safe_ptr<core::frame_factory>& frame_factory,\r
-                                                  const std::wstring& filter)\r
-       : impl_(new implementation(video_source, audio_source, target, in_fps, frame_factory, filter))\r
-{\r
-}\r
-\r
-int64_t frame_muxer2::calc_nb_frames(int64_t nb_frames) const\r
-{\r
-       return impl_->calc_nb_frames(nb_frames);\r
-}\r
+frame_muxer::frame_muxer(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
+       : impl_(new implementation(in_fps, frame_factory, filter_str)){}\r
+void frame_muxer::push(const std::shared_ptr<AVFrame>& video_frame, int hints){impl_->push(video_frame, hints);}\r
+void frame_muxer::push(const std::shared_ptr<core::audio_buffer>& audio_samples){return impl_->push(audio_samples);}\r
+void frame_muxer::commit(){impl_->commit();}\r
+safe_ptr<basic_frame> frame_muxer::pop(){return impl_->pop();}\r
+size_t frame_muxer::size() const {return impl_->size();}\r
+bool frame_muxer::empty() const {return impl_->size() == 0;}\r
+bool frame_muxer::video_ready() const{return impl_->video_ready();}\r
+bool frame_muxer::audio_ready() const{return impl_->audio_ready();}\r
+int64_t frame_muxer::calc_nb_frames(int64_t nb_frames) const {return impl_->calc_nb_frames(nb_frames);}\r
 \r
 }}
\ No newline at end of file
index a2940c6974d8f2c28ba661927f2475ae08dd1f5f..edb468bdb17392ac5fa45f75a877194fa193cfeb 100644 (file)
@@ -1,16 +1,11 @@
 #pragma once\r
 \r
-#include "util.h"\r
-\r
 #include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
 \r
 #include <core/mixer/audio/audio_mixer.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
-#include <agents.h>\r
-\r
 #include <vector>\r
 \r
 struct AVFrame;\r
@@ -27,26 +22,25 @@ struct frame_factory;
 \r
 namespace ffmpeg {\r
 \r
-class frame_muxer2 : boost::noncopyable\r
+class frame_muxer : boost::noncopyable\r
 {\r
 public:\r
+       frame_muxer(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str);\r
        \r
-       typedef std::pair<safe_ptr<AVFrame>, ticket_t>                          video_source_element_t;\r
-       typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t>       audio_source_element_t;\r
-       typedef std::pair<safe_ptr<core::basic_frame>, ticket_t>        target_element_t;\r
-\r
-       typedef Concurrency::ISource<video_source_element_t>            video_source_t;\r
-       typedef Concurrency::ISource<audio_source_element_t>            audio_source_t;\r
-       typedef Concurrency::ITarget<target_element_t>                          target_t;\r
-                                                                \r
-       frame_muxer2(video_source_t* video_source,\r
-                                audio_source_t* audio_source, \r
-                                target_t& target,\r
-                                double in_fps, \r
-                                const safe_ptr<core::frame_factory>& frame_factory,\r
-                                const std::wstring& filter = L"");\r
+       void push(const std::shared_ptr<AVFrame>& video_frame, int hints = 0);\r
+       void push(const std::shared_ptr<core::audio_buffer>& audio_samples);\r
        \r
+       void commit();\r
+\r
+       bool video_ready() const;\r
+       bool audio_ready() const;\r
+\r
+       size_t size() const;\r
+       bool empty() const;\r
+\r
        int64_t calc_nb_frames(int64_t nb_frames) const;\r
+\r
+       safe_ptr<core::basic_frame> pop();\r
 private:\r
        struct implementation;\r
        safe_ptr<implementation> impl_;\r
index a5f2e8b0835018c0a82e9262bff8ee7588461cea..a2dc02c263a485a0bd9fbe6f972151c623c38c4c 100644 (file)
@@ -24,6 +24,7 @@
 #include "../stdafx.h"\r
 \r
 #include "input.h"\r
+\r
 #include "util.h"\r
 #include "../ffmpeg_error.h"\r
 \r
 #include <common/exception/exceptions.h>\r
 #include <common/exception/win32_exception.h>\r
 \r
+#include <tbb/concurrent_queue.h>\r
 #include <tbb/atomic.h>\r
 \r
-#include <agents.h>\r
-#include <concrt_extras.h>\r
+#include <boost/range/algorithm.hpp>\r
+#include <boost/thread/condition_variable.hpp>\r
+#include <boost/thread/mutex.hpp>\r
+#include <boost/thread/thread.hpp>\r
 \r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
@@ -52,120 +56,114 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
+#include <concrt_extras.h>\r
 \r
-#undef Yield\r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
 \r
-static const size_t MAX_TOKENS = 32;\r
+static const size_t MAX_BUFFER_SIZE  = 16 * 1000000;\r
+static const size_t MAX_BUFFER_COUNT = 100;\r
        \r
-struct input::implementation : public Concurrency::agent, boost::noncopyable\r
-{\r
-       input::target_t&                                                target_;\r
-\r
-       const std::wstring                                              filename_;\r
-       const safe_ptr<AVFormatContext>                 format_context_; // Destroy this last\r
-       int                                                                             default_stream_index_;\r
-       const boost::iterator_range<AVStream**> streams_;\r
+struct input::implementation : public agent, public std::enable_shared_from_this<input::implementation>, boost::noncopyable\r
+{              \r
+       input::target_t&                                                                                        target_;\r
+       const safe_ptr<diagnostics::graph>                                                      graph_;\r
 \r
-       safe_ptr<diagnostics::graph>                    graph_;\r
+       const safe_ptr<AVFormatContext>                                                         format_context_; // Destroy this last\r
+       const int                                                                                                       default_stream_index_;\r
                \r
-       tbb::atomic<bool>                                               loop_;\r
-       const size_t                                                    start_;         \r
-       const size_t                                                    length_;\r
-       size_t                                                                  frame_number_;\r
-                       \r
-       tbb::atomic<size_t>                                             nb_frames_;\r
-       tbb::atomic<size_t>                                             nb_loops_;      \r
-       tbb::atomic<size_t>                                             packets_count_;\r
-       tbb::atomic<size_t>                                             packets_size_;\r
-\r
-       overwrite_buffer<bool>                                  is_running_;\r
-       governor                                                                governor_;\r
+       const std::wstring                                                                                      filename_;\r
+       const bool                                                                                                      loop_;\r
+       const size_t                                                                                            start_;         \r
+       const size_t                                                                                            length_;\r
+       size_t                                                                                                          frame_number_;\r
+       \r
+       tbb::atomic<size_t>                                                                                     buffer_size_;\r
+       tbb::atomic<size_t>                                                                                     buffer_count_;\r
+       Concurrency::event                                                                                      event_;\r
                \r
+       tbb::atomic<bool>                                                                                       is_running_;\r
+\r
+       tbb::atomic<size_t>                                                                                     nb_frames_;\r
+       tbb::atomic<size_t>                                                                                     nb_loops_;\r
+\r
 public:\r
-       explicit implementation(input::target_t& target,\r
-                                                       const safe_ptr<diagnostics::graph>& graph, \r
-                                                       const std::wstring& filename, \r
-                                                       bool loop, \r
-                                                       size_t start,\r
-                                                       size_t length)\r
+       explicit implementation(input::target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
                : target_(target)\r
-               , filename_(filename)\r
-               , format_context_(open_input(filename))         \r
-               , default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
-               , streams_(format_context_->streams, format_context_->streams + format_context_->nb_streams)\r
                , graph_(graph)\r
+               , format_context_(open_input(filename))\r
+               , default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
+               , loop_(loop)\r
+               , filename_(filename)\r
                , start_(start)\r
                , length_(length)\r
                , frame_number_(0)\r
-               , governor_(8)\r
-       {               \r
-               loop_                   = loop;\r
-               packets_count_  = 0;\r
-               packets_size_   = 0;\r
-               nb_frames_              = 0;\r
-               nb_loops_               = 0;\r
-                               \r
-               //av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
-                               \r
+       {       \r
+               event_.set();\r
+               \r
                if(start_ > 0)                  \r
                        seek_frame(start_);\r
                                                                \r
-               graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+               graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));        \r
+               graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
+\r
+               agent::start();\r
        }\r
 \r
        ~implementation()\r
        {\r
-               if(is_running_.value())\r
-                       stop();\r
+               is_running_ = false;\r
+               event_.set();\r
+               agent::wait(this);\r
        }\r
-       \r
+               \r
        void stop()\r
        {\r
-               send(is_running_, false);\r
-               governor_.cancel();\r
+               is_running_ = false;\r
+               event_.set();\r
                agent::wait(this);\r
        }\r
-       \r
-       virtual void run()\r
-       {\r
-               try\r
-               {\r
-                       int last_stream_index = -1;\r
 \r
-                       send(is_running_, true);\r
-                       for(auto packet = read_next_packet(); packet && is_running_.value(); packet = read_next_packet())\r
-                       {                               \r
-                               ticket_t ticket;\r
+       size_t nb_frames() const\r
+       {\r
+               return nb_frames_;\r
+       }\r
 \r
-                               if((format_context_->nb_streams < 2 || last_stream_index != packet->stream_index) && packet->stream_index == default_stream_index_)\r
-                                       ticket = governor_.acquire();\r
-                               last_stream_index = packet->stream_index;\r
+       size_t nb_loops() const\r
+       {\r
+               return nb_loops_;\r
+       }\r
 \r
-                               Concurrency::asend(target_, input::target_element_t(packet, ticket));\r
-                               Context::Yield();\r
+private:\r
+       \r
+       virtual void run()\r
+       {               \r
+               try\r
+               {                       \r
+                       is_running_     = true;\r
+                       while(is_running_)\r
+                       {\r
+                               read_next_packet();             \r
+                               event_.wait();  \r
                        }\r
                }\r
                catch(...)\r
                {\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
-               }       \r
-       \r
-               BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::send(target_, input::target_element_t(eof_packet(stream->index), ticket_t()));     \r
-\r
+               }\r
+               \r
+               is_running_ = false;\r
                done();\r
        }\r
-\r
-       std::shared_ptr<AVPacket> read_next_packet()\r
-       {               \r
+                       \r
+       void read_next_packet()\r
+       {                               \r
                auto packet = create_packet();\r
-               \r
+\r
                int ret = [&]() -> int\r
                {\r
-                       Concurrency::scoped_oversubcription_token oversubscribe;\r
+                       scoped_oversubcription_token oversubscribe;\r
                        return av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life.      \r
                }();\r
 \r
@@ -173,7 +171,7 @@ public:
                {\r
                        ++nb_loops_;\r
                        frame_number_ = 0;\r
-\r
+                       \r
                        if(loop_)\r
                        {\r
                                CASPAR_LOG(trace) << print() << " Looping.";\r
@@ -183,37 +181,51 @@ public:
                        else\r
                        {\r
                                CASPAR_LOG(trace) << print() << " Stopping.";\r
-                               return nullptr;\r
+                               is_running_ = false;\r
                        }\r
                }\r
+               else\r
+               {               \r
+                       THROW_ON_ERROR(ret, "av_read_frame", print());\r
 \r
-               THROW_ON_ERROR(ret, "av_read_frame", print());\r
-\r
-               if(packet->stream_index == default_stream_index_)\r
-               {\r
-                       if(nb_loops_ == 0)\r
-                               ++nb_frames_;\r
-                       ++frame_number_;\r
-               }\r
+                       if(packet->stream_index == default_stream_index_)\r
+                       {\r
+                               if(nb_loops_ == 0)\r
+                                       ++nb_frames_;\r
+                               ++frame_number_;\r
+                       }\r
 \r
-               THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
+                       THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
                                \r
-               // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
-               auto size = packet->size;\r
-               auto data = packet->data;                       \r
+                       // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+                       auto size = packet->size;\r
+                       auto data = packet->data;\r
 \r
-               packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
-               {\r
-                       packet->size = size;\r
-                       packet->data = data;\r
-                       --packets_count_;\r
-               });\r
-\r
-               ++packets_count_;\r
-                                                       \r
-               return packet;\r
-       }\r
+                       auto self = shared_from_this();\r
+                       packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+                       {\r
+                               packet->size = size;\r
+                               packet->data = data;\r
+\r
+                               self->buffer_size_ -= packet->size;\r
+                               --self->buffer_count_;\r
+                               self->event_.set();\r
+                               self->graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+                               self->graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
+                       });\r
+                       \r
+                       buffer_size_ += packet->size;\r
+                       ++buffer_count_;\r
+                       if((buffer_size_ > MAX_BUFFER_SIZE || buffer_count_ > MAX_BUFFER_COUNT) && is_running_)\r
+                               event_.reset();\r
 \r
+                       send(target_, std::shared_ptr<AVPacket>(packet));\r
+                               \r
+                       graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+                       graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
+               }                       \r
+       }\r
+       \r
        void seek_frame(int64_t frame, int flags = 0)\r
        {                       \r
                if(flags == AVSEEK_FLAG_BACKWARD)\r
@@ -229,14 +241,11 @@ public:
                }\r
 \r
                THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print());    \r
-               auto packet = create_packet();\r
-               packet->size = 0;\r
-\r
-               BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::asend(target_, input::target_element_t(loop_packet(stream->index), ticket_t()));   \r
+       \r
+               Concurrency::asend(target_, std::shared_ptr<AVPacket>());       \r
 \r
                graph_->add_tag("seek");                \r
-       }               \r
+       }       \r
 \r
        bool is_eof(int ret)\r
        {\r
@@ -245,8 +254,6 @@ public:
                if(ret == AVERROR_EOF)\r
                        CASPAR_LOG(trace) << print() << " Received EOF. " << nb_frames_;\r
 \r
-               CASPAR_VERIFY(ret >= 0 || ret == AVERROR_EOF || ret == AVERROR(EIO), ffmpeg_error() << source_info(narrow(print())));\r
-\r
                return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF;\r
        }\r
        \r
@@ -256,48 +263,11 @@ public:
        }\r
 };\r
 \r
-input::input(input::target_t& target,\r
-                        const safe_ptr<diagnostics::graph>& graph, \r
-                        const std::wstring& filename, \r
-                        bool loop, \r
-                        size_t start, \r
-                        size_t length)\r
-       : impl_(new implementation(target, graph, filename, loop, start, length))\r
-{\r
-}\r
-\r
-safe_ptr<AVFormatContext> input::context()\r
-{\r
-       return safe_ptr<AVFormatContext>(impl_->format_context_);\r
-}\r
-\r
-size_t input::nb_frames() const\r
-{\r
-       return impl_->nb_frames_;\r
-}\r
-\r
-size_t input::nb_loops() const \r
-{\r
-       return impl_->nb_loops_;\r
-}\r
-\r
-void input::start()\r
-{\r
-       impl_->start();\r
-}\r
-\r
-void input::stop()\r
-{\r
-       impl_->stop();\r
-}\r
-\r
-bool input::loop() const\r
-{\r
-       return impl_->loop_;\r
-}\r
-void input::loop(bool value)\r
-{\r
-       impl_->loop_ = value;\r
-}\r
-\r
+input::input(target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length)\r
+       : impl_(new implementation(target, graph, filename, loop, start, length)){}\r
+bool input::eof() const {return !impl_->is_running_;}\r
+safe_ptr<AVFormatContext> input::context(){return impl_->format_context_;}\r
+size_t input::nb_frames() const {return impl_->nb_frames();}\r
+size_t input::nb_loops() const {return impl_->nb_loops();}\r
+void input::stop(){impl_->stop();}\r
 }}
\ No newline at end of file
index e666847c17ead020e5b44420bddda5849010da80..5688838cb1e3207f75c02cafda3786dd23625745 100644 (file)
 */\r
 #pragma once\r
 \r
-#include "util.h"\r
-\r
 #include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
-\r
-#include <agents.h>\r
-#include <concrt.h>\r
 \r
 #include <memory>\r
 #include <string>\r
 \r
 #include <boost/noncopyable.hpp>\r
-#include <boost/range/iterator_range.hpp>\r
+\r
+#include <agents.h>\r
 \r
 struct AVFormatContext;\r
 struct AVPacket;\r
@@ -45,33 +40,23 @@ class graph;
 }\r
         \r
 namespace ffmpeg {\r
-                               \r
+\r
 class input : boost::noncopyable\r
 {\r
 public:\r
-       \r
-       typedef std::pair<safe_ptr<AVPacket>, ticket_t> target_element_t;\r
+       typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
+\r
+       explicit input(target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start = 0, size_t length = std::numeric_limits<size_t>::max());\r
 \r
-       typedef Concurrency::ITarget<target_element_t> target_t;\r
+       bool eof() const;\r
+\r
+       void stop();\r
 \r
-       explicit input(target_t& target, \r
-                                  const safe_ptr<diagnostics::graph>& graph, \r
-                                  const std::wstring& filename, bool loop, \r
-                                  size_t start = 0, \r
-                                  size_t length = std::numeric_limits<size_t>::max());\r
-               \r
        size_t nb_frames() const;\r
        size_t nb_loops() const;\r
-       \r
-       safe_ptr<AVFormatContext> context();\r
 \r
-       bool loop() const;\r
-       void loop(bool value);\r
-\r
-       void start();\r
-       void stop();\r
+       safe_ptr<AVFormatContext> context();\r
 private:\r
-       friend struct implemenation;\r
        struct implementation;\r
        std::shared_ptr<implementation> impl_;\r
 };\r
index 2e55d19f0dd8d698a72572015d346c8fef52ed94..36da9d2d37d1cfe64ad518494a1222bfbbbc9e5b 100644 (file)
 \r
 #include "../../ffmpeg_error.h"\r
 \r
-#include <core/producer/frame/basic_frame.h>\r
-#include <common/memory/memcpy.h>\r
+#include <core/producer/frame/frame_transform.h>\r
+#include <core/producer/frame/frame_factory.h>\r
+\r
+#include <boost/range/algorithm_ext/push_back.hpp>\r
+#include <boost/filesystem.hpp>\r
+\r
+#include <queue>\r
 \r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
@@ -42,168 +47,119 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <tbb/scalable_allocator.h>\r
-\r
-#undef Yield\r
-using namespace Concurrency;\r
-\r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
-{      \r
+struct video_decoder::implementation : boost::noncopyable\r
+{\r
+       const safe_ptr<core::frame_factory>             frame_factory_;\r
        int                                                                             index_;\r
-       std::shared_ptr<AVCodecContext>                 codec_context_;\r
-       \r
-       double                                                                  fps_;\r
-       int64_t                                                                 nb_frames_;\r
+       safe_ptr<AVCodecContext>                                codec_context_;\r
 \r
-       size_t                                                                  width_;\r
-       size_t                                                                  height_;\r
-       bool                                                                    is_progressive_;\r
-       \r
-       unbounded_buffer<video_decoder::source_element_t>       source_;\r
-       ITarget<video_decoder::target_element_t>&                       target_;\r
+       std::queue<std::shared_ptr<AVPacket>>   packets_;\r
        \r
+       const double                                                    fps_;\r
+       const int64_t                                                   nb_frames_;\r
+       const size_t                                                    width_;\r
+       const size_t                                                    height_;\r
+\r
 public:\r
-       explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
-               : codec_context_(open_codec(context, AVMEDIA_TYPE_VIDEO, index_))\r
+       explicit implementation(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) \r
+               : frame_factory_(frame_factory)\r
+               , codec_context_(open_codec(*context, AVMEDIA_TYPE_VIDEO, index_))\r
                , fps_(static_cast<double>(codec_context_->time_base.den) / static_cast<double>(codec_context_->time_base.num))\r
-               , nb_frames_(context.streams[index_]->nb_frames)\r
+               , nb_frames_(context->streams[index_]->nb_frames)\r
                , width_(codec_context_->width)\r
                , height_(codec_context_->height)\r
-               , is_progressive_(true)\r
-               , source_([this](const video_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
-               , target_(target)\r
-       {               \r
-               CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
-               \r
-               CASPAR_VERIFY(width_ > 0, ffmpeg_error());\r
-               CASPAR_VERIFY(height_ > 0, ffmpeg_error());\r
+       {                       \r
+       }\r
 \r
-               source.link_target(&source_);\r
+       void push(const std::shared_ptr<AVPacket>& packet)\r
+       {\r
+               if(packet && packet->stream_index != index_)\r
+                       return;\r
 \r
-               start();\r
+               packets_.push(packet);\r
        }\r
 \r
-       ~implementation()\r
-       {\r
-               agent::wait(this);\r
+       std::vector<std::shared_ptr<AVFrame>> poll()\r
+       {               \r
+               std::vector<std::shared_ptr<AVFrame>> result;\r
+\r
+               if(packets_.empty())\r
+                       return result;\r
+               \r
+               auto packet = packets_.front();\r
+                                       \r
+               if(packet)\r
+               {                       \r
+                       boost::range::push_back(result, decode(*packet));\r
+\r
+                       if(packet->size == 0)\r
+                               packets_.pop();\r
+               }\r
+               else\r
+               {\r
+                       if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+                       {\r
+                               AVPacket pkt;\r
+                               av_init_packet(&pkt);\r
+                               pkt.data = nullptr;\r
+                               pkt.size = 0;\r
+\r
+                               boost::range::push_back(result, decode(pkt));   \r
+                       }\r
+\r
+                       if(result.empty())\r
+                       {                                       \r
+                               packets_.pop();\r
+                               avcodec_flush_buffers(codec_context_.get());\r
+                               result.push_back(nullptr);\r
+                       }\r
+               }\r
+               \r
+               return result;\r
        }\r
        \r
-       std::shared_ptr<AVFrame> decode(AVPacket& packet)\r
+       std::vector<std::shared_ptr<AVFrame>> decode(AVPacket& pkt)\r
        {\r
                std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), av_free);\r
 \r
                int frame_finished = 0;\r
-               THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, &packet), "[video_decocer]");\r
-\r
-               // 1 packet <=> 1 frame.\r
+               THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, &pkt), "[video_decocer]");\r
+               \r
                // If a decoder consumes less then the whole packet then something is wrong\r
                // that might be just harmless padding at the end, or a problem with the\r
                // AVParser or demuxer which puted more then one frame in a AVPacket.\r
+               pkt.data = nullptr;\r
+               pkt.size = 0;\r
 \r
                if(frame_finished == 0) \r
-                       return nullptr;\r
-                               \r
-               if(decoded_frame->repeat_pict > 0)\r
-                       CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
+                       return std::vector<std::shared_ptr<AVFrame>>();\r
 \r
-               return decoded_frame;\r
-       }\r
-\r
-       virtual void run()\r
-       {\r
-               try\r
-               {\r
-                       while(true)\r
-                       {\r
-                               auto element = receive(source_);\r
-                               auto packet = element.first;\r
-                       \r
-                               if(packet == loop_packet(index_))\r
-                               {                                       \r
-                                       if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
-                                       {\r
-                                               AVPacket pkt;\r
-                                               av_init_packet(&pkt);\r
-                                               pkt.data = nullptr;\r
-                                               pkt.size = 0;\r
-\r
-                                               for(auto decoded_frame = decode(pkt); decoded_frame; decoded_frame = decode(pkt))\r
-                                               {\r
-                                                       send(target_, target_element_t(dup_frame(make_safe_ptr(decoded_frame)), element.second));\r
-                                                       Context::Yield();\r
-                                               }\r
-                                       }\r
-\r
-                                       avcodec_flush_buffers(codec_context_.get());\r
-                                       send(target_, target_element_t(loop_video(), ticket_t()));\r
-                                       continue;\r
-                               }\r
-\r
-                               if(packet == eof_packet(index_))\r
-                                       break;\r
-\r
-                               auto decoded_frame = decode(*packet);\r
-                               if(!decoded_frame)\r
-                                       continue;\r
-               \r
-                               is_progressive_ = decoded_frame->interlaced_frame == 0;\r
-                               \r
-                               // C-TODO: Avoid duplication.\r
-                               // Need to dupliace frame data since avcodec_decode_video2 reuses it.\r
-                               send(target_, target_element_t(dup_frame(make_safe_ptr(decoded_frame)), element.second));                               \r
-                               Context::Yield();\r
-                       }\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-               }\r
+               if(decoded_frame->repeat_pict % 2 > 0)\r
+                       CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
                \r
-               send(target_, target_element_t(eof_video(), ticket_t()));\r
-\r
-               done();\r
+               return std::vector<std::shared_ptr<AVFrame>>(1 + decoded_frame->repeat_pict/2, decoded_frame);\r
        }\r
-\r
-       safe_ptr<AVFrame> dup_frame(const safe_ptr<AVFrame>& frame)\r
+       \r
+       bool ready() const\r
        {\r
-               auto desc = get_pixel_format_desc(static_cast<PixelFormat>(frame->format), frame->width, frame->height);\r
-\r
-               auto count = desc.planes.size();\r
-               std::array<uint8_t*, 4> org_ptrs;\r
-               std::array<safe_ptr<uint8_t>, 4> new_ptrs;\r
-               parallel_for<size_t>(0, count, [&](size_t n)\r
-               {\r
-                       CASPAR_ASSERT(frame->data[n]);\r
-                       auto size               = frame->linesize[n]*desc.planes[n].height;\r
-                       new_ptrs[n]             = fast_memdup(frame->data[n], size);\r
-                       org_ptrs[n]             = frame->data[n];\r
-                       frame->data[n]  = new_ptrs[n].get();\r
-               });\r
-\r
-               return safe_ptr<AVFrame>(frame.get(), [frame, org_ptrs, new_ptrs, count](AVFrame*)\r
-               {\r
-                       for(size_t n = 0; n < count; ++n)\r
-                               frame->data[n] = org_ptrs[n];\r
-               });\r
+               return !packets_.empty();\r
        }\r
-               \r
+       \r
        double fps() const\r
        {\r
                return fps_;\r
        }\r
 };\r
 \r
-video_decoder::video_decoder(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
-       : impl_(new implementation(source, target, context))\r
-{\r
-}\r
-\r
+video_decoder::video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) : impl_(new implementation(context, frame_factory)){}\r
+void video_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
+std::vector<std::shared_ptr<AVFrame>> video_decoder::poll(){return impl_->poll();}\r
+bool video_decoder::ready() const{return impl_->ready();}\r
 double video_decoder::fps() const{return impl_->fps();}\r
 int64_t video_decoder::nb_frames() const{return impl_->nb_frames_;}\r
 size_t video_decoder::width() const{return impl_->width_;}\r
 size_t video_decoder::height() const{return impl_->height_;}\r
-bool video_decoder::is_progressive() const{return impl_->is_progressive_;}\r
 \r
 }}
\ No newline at end of file
index 4bf30dac7e90bb5b728340ae49510cba2da8c6b9..7742d18de2b041126f1eb3f9f466543c995435aa 100644 (file)
 */\r
 #pragma once\r
 \r
-#include "../util.h"\r
-\r
 #include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/governor.h>\r
 \r
 #include <core/video_format.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
-#include <agents.h>\r
-\r
 #include <vector>\r
 \r
 struct AVFormatContext;\r
@@ -40,6 +35,7 @@ namespace caspar {
 \r
 namespace core {\r
        struct frame_factory;\r
+       class write_frame;\r
 }\r
 \r
 namespace ffmpeg {\r
@@ -47,20 +43,16 @@ namespace ffmpeg {
 class video_decoder : boost::noncopyable\r
 {\r
 public:\r
+       explicit video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory);\r
        \r
-       typedef std::pair<safe_ptr<AVPacket>, ticket_t> source_element_t;\r
-       typedef std::pair<safe_ptr<AVFrame>, ticket_t> target_element_t;\r
-\r
-       typedef Concurrency::ISource<source_element_t>  source_t;\r
-       typedef Concurrency::ITarget<target_element_t>  target_t;\r
+       void push(const std::shared_ptr<AVPacket>& packet);\r
+       bool ready() const;\r
+       std::vector<std::shared_ptr<AVFrame>> poll();\r
        \r
-       explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context);   \r
-\r
        size_t width() const;\r
        size_t height() const;\r
 \r
        int64_t nb_frames() const;\r
-       bool is_progressive() const;\r
 \r
        double fps() const;\r
 private:\r
index 9351152d545f6d08c20541ff61f8adb159afebca..10b31e58189251f3d0e56d47be39c1c9c56b96de 100644 (file)
         </decklink>\r
       </consumers>\r
     </channel>\r
-    <channel>\r
-      <video-mode>PAL</video-mode>\r
-      <consumers>\r
-        <bluefish>\r
-          <device>1</device>\r
-          <embedded-audio>true</embedded-audio>\r
-        </bluefish>\r
-      </consumers>\r
-    </channel>\r
   </channels>\r
   <controllers>\r
     <tcp>\r