]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 24 Oct 2011 17:05:30 +0000 (17:05 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 24 Oct 2011 17:05:30 +0000 (17:05 +0000)
17 files changed:
common/concurrency/governor.h
core/consumer/output.cpp
core/consumer/output.h
core/mixer/mixer.cpp
core/mixer/mixer.h
core/producer/stage.h
core/video_channel.cpp
modules/decklink/producer/decklink_producer.cpp
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/audio/audio_decoder.h
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/ffmpeg/producer/frame_muxer.cpp
modules/ffmpeg/producer/frame_muxer.h
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/input.h
modules/ffmpeg/producer/video/video_decoder.cpp
modules/ffmpeg/producer/video/video_decoder.h

index 30485a2457cfd64b9fba914f3258f592bcb03687..10bb311a47e0243263d3b5c8c2b3dd13cc2aa002 100644 (file)
@@ -7,7 +7,8 @@
 #include <tbb/atomic.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
-#include <boost/any.hpp>\r
+\r
+#include <vector>\r
 \r
 namespace caspar {\r
        \r
@@ -77,17 +78,23 @@ namespace caspar {
 //     };\r
 //}\r
        \r
-typedef safe_ptr<int> ticket_t;\r
+#undef Yield\r
+\r
+typedef std::vector<safe_ptr<int>> ticket_t;\r
 \r
 class governor : boost::noncopyable\r
 {\r
        tbb::atomic<int> count_;\r
        Concurrency::concurrent_queue<Concurrency::Context*> waiting_contexts_;\r
 \r
-       void acquire_ticket(Concurrency::Context* context)\r
+       void acquire_ticket()\r
        {\r
+               if(count_ < 1)\r
+                       Concurrency::Context::Yield();\r
+\r
                if (--count_ < 0)\r
                {\r
+                       auto context = Concurrency::Context::CurrentContext();\r
                        waiting_contexts_.push(context);\r
                        context->Block();\r
                }\r
@@ -99,7 +106,7 @@ class governor : boost::noncopyable
                {\r
                        Concurrency:: Context* waiting = NULL;\r
                        while(!waiting_contexts_.try_pop(waiting))\r
-                               Concurrency::wait(0);\r
+                               Concurrency::Context::Yield();\r
                        waiting->Unblock();\r
                }\r
        }\r
@@ -113,18 +120,20 @@ public:
        \r
        ticket_t acquire()\r
        {\r
-               acquire_ticket(Concurrency::Context::CurrentContext());\r
+               acquire_ticket();\r
                \r
-               return safe_ptr<int>(new int, [this](int* p)\r
+               ticket_t ticket;\r
+               ticket.push_back(safe_ptr<int>(new int, [this](int* p)\r
                {\r
                        delete p;\r
                        release_ticket();\r
-               });\r
+               }));\r
+               return ticket;\r
        }\r
 \r
        void cancel()\r
        {\r
-               while(count_ <= 0)\r
+               while(count_ < 0)\r
                        release_ticket();\r
        }\r
 };\r
index a33459621e8311e65355ea5998c304fe67ff24b6..3d852b0586c9b0942cfdf0d846690829ab2fdd48 100644 (file)
@@ -95,7 +95,7 @@ public:
                                                \r
        void execute(const output::source_element_t& element)\r
        {       \r
-               auto frame = element->first;\r
+               auto frame = element.first;\r
 \r
                {\r
                        critical_section::scoped_lock lock(mutex_);             \r
index e719595b792be8eb67643d4a019626762bdf20a3..73102093fba613aa3d2138f5c3ac71ce16962dd9 100644 (file)
@@ -35,8 +35,8 @@ class video_channel_context;
 class output : boost::noncopyable\r
 {\r
 public:\r
-       typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>>       source_element_t;\r
-       typedef Concurrency::ISource<source_element_t>                                          source_t;\r
+       typedef std::pair<safe_ptr<core::read_frame>, ticket_t> source_element_t;\r
+       typedef Concurrency::ISource<source_element_t>                  source_t;\r
 \r
        explicit output(source_t& source, const video_format_desc& format_desc);\r
 \r
index f4d6dbd022d3e871d62dd6904c8ccbde31b4c286..08f24b54fa65731f23a5462ed37f39edd8554ee0 100644 (file)
@@ -111,7 +111,7 @@ public:
                \r
        mixer::target_element_t mix(const mixer::source_element_t& element)\r
        {               \r
-               auto frames = element->first;\r
+               auto frames = element.first;\r
 \r
                auto frame = make_safe<read_frame>();\r
 \r
@@ -158,7 +158,7 @@ public:
                        Concurrency::wait(20);\r
                }\r
 \r
-               return mixer::target_element_t(std::make_pair(std::move(frame), element->second));      \r
+               return mixer::target_element_t(std::move(frame), element.second);       \r
        }\r
                                                \r
        boost::unique_future<safe_ptr<core::write_frame>> async_create_frame(const void* tag, const core::pixel_format_desc& desc)\r
index 6193b69d6a6c6b519083710bd3997f9aa7620894..4416e28110cc444371f1f19af5d7abb3b6a7f796 100644 (file)
@@ -49,8 +49,8 @@ class mixer : public core::frame_factory
 {\r
 public:        \r
        \r
-       typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> source_element_t;\r
-       typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>>                       target_element_t;\r
+       typedef std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>       source_element_t;\r
+       typedef std::pair<safe_ptr<core::read_frame>, ticket_t>                         target_element_t;\r
 \r
        typedef Concurrency::ISource<source_element_t>                                                          source_t;\r
        typedef Concurrency::ITarget<target_element_t>                                                          target_t;\r
index 3a925e80982747cf6f727d36da39b4b6cdebf944..e89a8c4be83ba70aa0a1e6028b05bf312cd3a4d7 100644 (file)
@@ -39,8 +39,8 @@ class stage : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> target_element_t;\r
-       typedef Concurrency::ITarget<target_element_t> target_t;\r
+       typedef std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>       target_element_t;\r
+       typedef Concurrency::ITarget<target_element_t>                                          target_t;\r
 \r
        explicit stage(target_t& target, governor& governor);\r
 \r
index 7e1977a297934957338255e507c47f34d8b4ec8f..2aae8e945daa869dd7df4d1711bfca26773dafe3 100644 (file)
@@ -47,8 +47,8 @@ namespace caspar { namespace core {
 \r
 struct video_channel::implementation : boost::noncopyable\r
 {\r
-       unbounded_buffer<safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>>>   stage_frames_;\r
-       unbounded_buffer<safe_ptr<std::pair<safe_ptr<read_frame>, ticket_t>>>                                   mixer_frames_;\r
+       unbounded_buffer<stage::target_element_t>       stage_frames_;\r
+       unbounded_buffer<mixer::target_element_t>       mixer_frames_;\r
        \r
        const video_format_desc                         format_desc_;\r
        \r
index e9161eba0b27a49d47506097fce922065f76f7e3..10fd7947455c2ee4c3ad67825f5eb45954f948be 100644 (file)
@@ -171,9 +171,9 @@ public:
        \r
 class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
 {              \r
-       Concurrency::bounded_buffer<safe_ptr<AVFrame>>                          video_frames_;\r
-       Concurrency::bounded_buffer<safe_ptr<core::audio_buffer>>       audio_buffers_;\r
-       Concurrency::bounded_buffer<safe_ptr<core::basic_frame>>        muxed_frames_;\r
+       Concurrency::bounded_buffer<ffmpeg::frame_muxer2::video_source_element_t>       video_frames_;\r
+       Concurrency::bounded_buffer<ffmpeg::frame_muxer2::audio_source_element_t>       audio_buffers_;\r
+       Concurrency::bounded_buffer<ffmpeg::frame_muxer2::target_element_t>                     muxed_frames_;\r
 \r
        const core::video_format_desc           format_desc_;\r
        const size_t                                            device_index_;\r
@@ -214,7 +214,8 @@ public:
 \r
                try\r
                {\r
-                       last_frame_ = frame = Concurrency::receive(muxed_frames_);\r
+                       auto frame_element = Concurrency::receive(muxed_frames_);\r
+                       last_frame_ = frame = frame_element.first;\r
                }\r
                catch(Concurrency::operation_timed_out&)\r
                {               \r
@@ -283,7 +284,7 @@ public:
                                Concurrency::parallel_invoke(\r
                                [&]\r
                                {\r
-                                       Concurrency::send(video_frames_, av_frame);                                     \r
+                                       Concurrency::send(video_frames_, ffmpeg::frame_muxer2::video_source_element_t(av_frame, ticket_t()));                                   \r
                                },\r
                                [&]\r
                                {                                                                                                       \r
@@ -292,10 +293,10 @@ public:
                                        {\r
                                                auto sample_frame_count = audio->GetSampleFrameCount();\r
                                                auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
-                                               Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
+                                               Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), ticket_t()));\r
                                        }\r
                                        else\r
-                                               Concurrency::send(audio_buffers_, ffmpeg::empty_audio());       \r
+                                               Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(ffmpeg::empty_audio(), ticket_t()));     \r
                                });\r
                        }\r
 \r
index 1afdb7f34157aa1e3823b65fe24b1757320f6a12..704739f135fffb5f07a66c14aa05c8da9de2963d 100644 (file)
@@ -42,6 +42,7 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
+#undef Yield\r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
@@ -55,9 +56,8 @@ struct audio_decoder::implementation : public agent, boost::noncopyable
        \r
        std::vector<int8_t,  tbb::cache_aligned_allocator<int8_t>>      buffer1_;\r
 \r
-       overwrite_buffer<bool>                                  is_running_;\r
-       unbounded_buffer<safe_ptr<AVPacket>>    source_;\r
-       ITarget<safe_ptr<core::audio_buffer>>&  target_;\r
+       unbounded_buffer<audio_decoder::source_element_t>                       source_;\r
+       ITarget<audio_decoder::target_element_t>&                                       target_;\r
        \r
 public:\r
        explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
@@ -66,10 +66,7 @@ public:
                                         format_desc.audio_sample_rate, codec_context_->sample_rate,\r
                                         AV_SAMPLE_FMT_S32,                             codec_context_->sample_fmt)\r
                , buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
-               , source_([this](const safe_ptr<AVPacket>& packet)\r
-                       {\r
-                               return packet->stream_index == index_;\r
-                       })\r
+               , source_([this](const audio_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
                , target_(target)\r
        {                               \r
                CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
@@ -81,7 +78,6 @@ public:
 \r
        ~implementation()\r
        {\r
-               send(is_running_, false);\r
                agent::wait(this);\r
        }\r
 \r
@@ -89,14 +85,14 @@ public:
        {\r
                try\r
                {\r
-                       send(is_running_, true);\r
-                       while(is_running_.value())\r
+                       while(true)\r
                        {                               \r
-                               auto packet = receive(source_);\r
+                               auto element = receive(source_);\r
+                               auto packet = element.first;\r
                        \r
                                if(packet == loop_packet(index_))\r
                                {\r
-                                       send(target_, loop_audio());\r
+                                       send(target_, audio_decoder::target_element_t(loop_audio(), ticket_t()));\r
                                        continue;\r
                                }\r
 \r
@@ -123,9 +119,9 @@ public:
                                        const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
                                        const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
 \r
-                                       send(target_, make_safe<core::audio_buffer>(samples, samples + n_samples));\r
+                                       send(target_, audio_decoder::target_element_t(make_safe<core::audio_buffer>(samples, samples + n_samples), element.second));\r
+                                       Context::Yield();\r
                                }\r
-                               Concurrency::wait(5);\r
                        }\r
                }\r
                catch(...)\r
@@ -133,8 +129,7 @@ public:
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
 \r
-               send(is_running_, false);\r
-               send(target_, eof_audio());\r
+               send(target_, audio_decoder::target_element_t(eof_audio(), ticket_t()));\r
 \r
                done();\r
        }\r
index 292b87baf0b5ce8c65e80f133cb7899193a1f56c..19da2325b1c74d4f53f7e2159f2718a2454b39ad 100644 (file)
@@ -24,6 +24,7 @@
 #include <core/mixer/audio/audio_mixer.h>\r
 \r
 #include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
@@ -47,8 +48,11 @@ class audio_decoder : boost::noncopyable
 {\r
 public:\r
 \r
-       typedef Concurrency::ISource<safe_ptr<AVPacket>>& source_t;\r
-       typedef Concurrency::ITarget<safe_ptr<core::audio_buffer>>& target_t;\r
+       typedef std::pair<safe_ptr<AVPacket>, ticket_t>                         source_element_t;\r
+       typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t>       target_element_t;\r
+\r
+       typedef Concurrency::ISource<source_element_t>&                         source_t;\r
+       typedef Concurrency::ITarget<target_element_t>&                         target_t;\r
        \r
        explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
        \r
index aa3bc6101f8d3fe292dbee511e2954876006274e..ec114b0e23a8b32cdf70911b1c7bdda42deade34 100644 (file)
@@ -57,25 +57,25 @@ namespace caspar { namespace ffmpeg {
                \r
 struct ffmpeg_producer : public core::frame_producer\r
 {      \r
-       const std::wstring                                                              filename_;\r
-       const int                                                                               start_;\r
-       const bool                                                                              loop_;\r
-       const size_t                                                                    length_;\r
+       const std::wstring                                                                              filename_;\r
+       const int                                                                                               start_;\r
+       const bool                                                                                              loop_;\r
+       const size_t                                                                                    length_;\r
        \r
-       call<safe_ptr<AVPacket>>                                                throw_away_;\r
-       unbounded_buffer<safe_ptr<AVPacket>>                    packets_;\r
-       unbounded_buffer<safe_ptr<AVFrame>>                             video_;\r
-       unbounded_buffer<safe_ptr<core::audio_buffer>>  audio_;\r
-       bounded_buffer<safe_ptr<core::basic_frame>>             frames_;\r
+       call<input::target_element_t>                                                   throw_away_;\r
+       unbounded_buffer<input::target_element_t>                               packets_;\r
+       unbounded_buffer<frame_muxer2::video_source_element_t>  video_;\r
+       unbounded_buffer<frame_muxer2::audio_source_element_t>  audio_;\r
+       unbounded_buffer<frame_muxer2::target_element_t>                frames_;\r
                \r
-       const safe_ptr<diagnostics::graph>                              graph_;\r
+       const safe_ptr<diagnostics::graph>                                              graph_;\r
                                        \r
-       input                                                                                   input_; \r
-       std::shared_ptr<video_decoder>                                  video_decoder_;\r
-       std::shared_ptr<audio_decoder>                                  audio_decoder_; \r
-       std::unique_ptr<frame_muxer2>                                   muxer_;\r
+       input                                                                                                   input_; \r
+       std::unique_ptr<frame_muxer2>                                                   muxer_;\r
+       std::shared_ptr<video_decoder>                                                  video_decoder_;\r
+       std::shared_ptr<audio_decoder>                                                  audio_decoder_; \r
 \r
-       safe_ptr<core::basic_frame>                                             last_frame_;\r
+       safe_ptr<core::basic_frame>                                                             last_frame_;\r
        \r
 public:\r
        explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
@@ -83,8 +83,7 @@ public:
                , start_(start)\r
                , loop_(loop)\r
                , length_(length)\r
-               , throw_away_([](const safe_ptr<AVPacket>&){})\r
-               , frames_(2)\r
+               , throw_away_([](const input::target_element_t&){})\r
                , graph_(diagnostics::create_graph("", false))\r
                , input_(packets_, graph_, filename_, loop, start, length)\r
                , last_frame_(core::basic_frame::empty())\r
@@ -136,10 +135,7 @@ public:
 \r
        ~ffmpeg_producer()\r
        {\r
-               input_.stop();                  \r
-               while(Concurrency::receive(frames_) != core::basic_frame::eof())\r
-               {\r
-               }\r
+               input_.stop();  \r
        }\r
                                                \r
        virtual safe_ptr<core::basic_frame> receive(int hints)\r
@@ -148,7 +144,8 @@ public:
                \r
                try\r
                {               \r
-                       frame = last_frame_ = Concurrency::receive(frames_, 10);\r
+                       auto frame_element = Concurrency::receive(frames_, 10);\r
+                       frame = last_frame_ = frame_element.first;\r
                        graph_->update_text(narrow(print()));\r
                }\r
                catch(operation_timed_out&)\r
index 86252eea61b5f0aab337a7129b8323c2b2817104..58cc11bc3eada31544fdfccd24ada06e6ebb2b8b 100644 (file)
@@ -16,6 +16,8 @@
 #include <common/exception/exceptions.h>\r
 #include <common/log/log.h>\r
 \r
+#include <boost/range/algorithm_ext/push_back.hpp>\r
+\r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
 #pragma warning (disable : 4244)\r
@@ -39,7 +41,13 @@ namespace caspar { namespace ffmpeg {
        \r
 struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
 {              \r
-       ITarget<safe_ptr<core::basic_frame>>&                   target_;\r
+       typedef std::pair<std::shared_ptr<core::write_frame>, ticket_t>         write_element_t;\r
+       typedef std::pair<std::shared_ptr<core::audio_buffer>, ticket_t>        audio_element_t;\r
+\r
+       frame_muxer2::video_source_t* video_source_;\r
+       frame_muxer2::audio_source_t* audio_source_;\r
+\r
+       ITarget<frame_muxer2::target_element_t>&                target_;\r
        mutable single_assignment<display_mode::type>   display_mode_;\r
        const double                                                                    in_fps_;\r
        const core::video_format_desc                                   format_desc_;\r
@@ -47,16 +55,11 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
        \r
        mutable single_assignment<safe_ptr<filter>>             filter_;\r
        const safe_ptr<core::frame_factory>                             frame_factory_;\r
-       \r
-       call<safe_ptr<AVFrame>>                                                 push_video_;\r
-       call<safe_ptr<core::audio_buffer>>                              push_audio_;\r
-       \r
-       unbounded_buffer<safe_ptr<AVFrame>>                             video_;\r
-       unbounded_buffer<safe_ptr<core::audio_buffer>>  audio_;\r
-       \r
+                       \r
        core::audio_buffer                                                              audio_data_;\r
-\r
-       overwrite_buffer<bool>                                                  is_running_;\r
+       \r
+       std::queue<write_element_t>                                             video_frames_;\r
+       std::queue<audio_element_t>                                             audio_buffers_;\r
 \r
        std::wstring                                                                    filter_str_;\r
        \r
@@ -66,97 +69,121 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                                   double in_fps, \r
                                   const safe_ptr<core::frame_factory>& frame_factory,\r
                                   const std::wstring& filter)\r
-               : target_(target)\r
+               : video_source_(video_source)\r
+               , audio_source_(audio_source)\r
+               , target_(target)\r
                , in_fps_(in_fps)\r
                , format_desc_(frame_factory->get_video_format_desc())\r
                , auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
                , frame_factory_(frame_factory)\r
-               , push_video_(std::bind(&implementation::push_video, this, std::placeholders::_1))\r
-               , push_audio_(std::bind(&implementation::push_audio, this, std::placeholders::_1))\r
-       {\r
-               if(video_source)\r
-                       video_source->link_target(&push_video_);\r
-               if(audio_source)\r
-                       audio_source->link_target(&push_audio_);\r
-               \r
+       {               \r
                start();\r
        }\r
 \r
        ~implementation()\r
        {\r
-               send(is_running_, false);\r
                agent::wait(this);\r
-               CASPAR_LOG(trace) << "[frame_muxer] Stopped.";\r
        }\r
-\r
-       std::shared_ptr<core::write_frame> receive_video()\r
+                               \r
+       write_element_t receive_video()\r
        {       \r
-               auto video = receive(video_);\r
-\r
-               if(!is_running_.value() || video == eof_video())\r
-               {       \r
-                       send(is_running_ , false);\r
-                       return nullptr;\r
+               if(!video_frames_.empty())\r
+               {\r
+                       auto video_frame = video_frames_.front();\r
+                       video_frames_.pop();\r
+                       return video_frame;\r
                }\r
 \r
-               CASPAR_ASSERT(video != loop_video());\r
+               auto element = receive(video_source_);\r
+               auto video = element.first;\r
 \r
-               try\r
+               if(video == eof_video())\r
+                       return write_element_t(nullptr, ticket_t());    \r
+               else if(video == empty_video())\r
+                       return write_element_t(make_safe<core::write_frame>(this), ticket_t());\r
+               else if(video != loop_video())\r
                {\r
-                       if(video == empty_video())\r
-                               return make_safe<core::write_frame>(this);\r
+                       if(!display_mode_.has_value())\r
+                               initialize_display_mode(*video);\r
 \r
-                       return make_write_frame(this, video, frame_factory_, 0);\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       send(is_running_, false);\r
-                       return nullptr;\r
-               }\r
-       }\r
+                       auto format = video->format;\r
+                       if(video->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
+                               video->format = PIX_FMT_GRAY8;\r
 \r
-       std::shared_ptr<core::audio_buffer> receive_audio()\r
-       {               \r
-               auto audio = receive(audio_);\r
+                       filter_.value()->push(video);\r
+                       while(true)\r
+                       {\r
+                               auto frame = filter_.value()->poll();\r
+                               if(!frame)\r
+                                       break;  \r
 \r
-               if(!is_running_.value() || audio == eof_audio())\r
-               {\r
-                       send(is_running_ , false);\r
-                       return nullptr;\r
+                               frame->format = format;\r
+                               video_frames_.push(write_element_t(make_write_frame(this, video, frame_factory_, 0), element.second));\r
+                       }\r
                }\r
 \r
-               CASPAR_ASSERT(audio != loop_audio());\r
-\r
-               try\r
+               return receive_video();\r
+       }\r
+       \r
+       audio_element_t receive_audio()\r
+       {               \r
+               if(!audio_buffers_.empty())\r
                {\r
-                       if(audio == empty_audio())\r
-                               return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
-\r
-                       return audio;\r
+                       auto audio_buffer = audio_buffers_.front();\r
+                       audio_buffers_.pop();\r
+                       return audio_buffer;\r
                }\r
-               catch(...)\r
+               \r
+               auto element = receive(audio_source_);\r
+               auto audio = element.first;\r
+\r
+               if(audio == eof_audio())\r
+                       return audio_element_t(nullptr, ticket_t());    \r
+               else if(audio == empty_audio())         \r
+                       audio_data_.resize(audio_data_.size() + format_desc_.audio_samples_per_frame, 0);\r
+               else if(audio != loop_audio())                  \r
+                       audio_data_.insert(audio_data_.end(), audio->begin(), audio->end());\r
+               \r
+               while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
                {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       send(is_running_, false);\r
-                       return nullptr;\r
+                       auto begin = audio_data_.begin(); \r
+                       auto end   = begin + format_desc_.audio_samples_per_frame;\r
+                       auto audio = make_safe<core::audio_buffer>(begin, end);\r
+                       audio_data_.erase(begin, end);\r
+                       audio_buffers_.push(frame_muxer2::audio_source_element_t(audio, element.second));\r
                }\r
+\r
+               return receive_audio();\r
        }\r
-       \r
+                       \r
        virtual void run()\r
        {\r
                try\r
                {\r
-                       send(is_running_, true);\r
-                       while(is_running_.value())\r
+                       bool eof = false;\r
+                       while(!eof)\r
                        {\r
-                               auto audio = receive_audio();   \r
+                               ticket_t tickets;\r
+\r
+                               auto audio_element = receive_audio();\r
+                               boost::range::push_back(tickets, audio_element.second);\r
+\r
+                               auto audio = audio_element.first;\r
                                if(!audio)\r
+                               {\r
+                                       eof = true;\r
                                        break;\r
-                                                                                       \r
-                               auto video = receive_video();\r
+                               }\r
+                               \r
+                               auto video_element = receive_video();\r
+                               boost::range::push_back(tickets, video_element.second);\r
+\r
+                               auto video = video_element.first;\r
                                if(!video)\r
+                               {\r
+                                       eof = true;\r
                                        break;\r
+                               }\r
 \r
                                video->audio_data() = std::move(*audio);\r
 \r
@@ -166,144 +193,73 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                                case display_mode::deinterlace:\r
                                case display_mode::deinterlace_bob:\r
                                        {\r
-                                               send(target_, safe_ptr<core::basic_frame>(std::move(video)));\r
+                                               send(target_, frame_muxer2::target_element_t(video, tickets));\r
 \r
                                                break;\r
                                        }\r
                                case display_mode::duplicate:                                   \r
                                        {                                                                               \r
-                                               send(target_, safe_ptr<core::basic_frame>(video));\r
+                                               send(target_, frame_muxer2::target_element_t(video, tickets));\r
 \r
-                                               auto audio2 = receive_audio();\r
+                                               auto audio_element2 = receive_audio();\r
+                                               boost::range::push_back(tickets, audio_element.second);\r
+\r
+                                               auto audio2     = audio_element2.first;\r
                                                if(audio2)\r
                                                {\r
                                                        auto video2 = make_safe<core::write_frame>(*video);\r
                                                        video2->audio_data() = std::move(*audio2);\r
-                                                       send(target_, safe_ptr<core::basic_frame>(video2));\r
+                                                       send(target_, frame_muxer2::target_element_t(video2, tickets));\r
                                                }\r
+                                               else\r
+                                                       eof = true;\r
 \r
                                                break;\r
                                        }\r
                                case display_mode::half:                                                \r
                                        {                                                               \r
-                                               send(target_, safe_ptr<core::basic_frame>(std::move(video)));\r
-                                               receive_video();\r
+                                               send(target_, frame_muxer2::target_element_t(video, tickets));\r
+\r
+                                               if(!receive_video().first)\r
+                                                       eof = true;\r
+\r
                                                break;\r
                                        }\r
                                case display_mode::deinterlace_bob_reinterlace:\r
                                case display_mode::interlace:                                   \r
                                        {                                       \r
                                                auto frame = safe_ptr<core::basic_frame>(std::move(video));\r
-                                               auto video2 = receive_video();\r
+\r
+                                               auto video_element2 = receive_video();\r
+                                               boost::range::push_back(tickets, video_element.second);\r
+\r
+                                               auto video2 = video_element2.first;\r
                                                auto frame2 = core::basic_frame::empty();\r
 \r
                                                if(video2)                                              \r
-                                                       frame2 = safe_ptr<core::basic_frame>(video2);                                                           \r
+                                                       frame2 = safe_ptr<core::basic_frame>(video2);                   \r
+                                               else\r
+                                                       eof = true;\r
                                                \r
                                                frame = core::basic_frame::interlace(std::move(frame), std::move(frame2), format_desc_.field_mode);     \r
-                                               send(target_, frame);\r
+                                               send(target_, frame_muxer2::target_element_t(frame, tickets));\r
 \r
                                                break;\r
                                        }\r
                                default:        \r
                                        BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
                                }\r
-                       }\r
+                       }       \r
                }\r
                catch(...)\r
                {\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
                \r
-               send(is_running_ , false);\r
-               send(target_, core::basic_frame::eof());\r
+               send(target_, frame_muxer2::target_element_t(core::basic_frame::eof(), ticket_t()));\r
 \r
                done();\r
        }\r
-                       \r
-       void push_video(const safe_ptr<AVFrame>& video_frame)\r
-       {               \r
-               if(video_frame == eof_video() || video_frame == empty_video())\r
-               {\r
-                       send(video_, video_frame);\r
-                       return;\r
-               }\r
-                               \r
-               if(video_frame == loop_video())         \r
-                       return; \r
-               \r
-               try\r
-               {\r
-                       if(!is_running_.value())\r
-                               return;\r
-\r
-                       if(!display_mode_.has_value())\r
-                               initialize_display_mode(*video_frame);\r
-                                               \r
-                       //send(video_, make_safe_ptr(video_frame));\r
-\r
-                       //if(hints & core::frame_producer::ALPHA_HINT)\r
-                       //      video_frame->format = make_alpha_format(video_frame->format);\r
-               \r
-                       auto format = video_frame->format;\r
-                       if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
-                               video_frame->format = PIX_FMT_GRAY8;\r
-\r
-                       filter_.value()->push(video_frame);\r
-\r
-                       while(true)\r
-                       {\r
-                               auto frame = filter_.value()->poll();\r
-                               if(!frame)\r
-                                       break;  \r
-\r
-                               frame->format = format;\r
-                               send(video_, make_safe_ptr(frame));\r
-                       }\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       send(is_running_ , false);\r
-                       send(video_, eof_video());\r
-               }\r
-       }\r
-\r
-       void push_audio(const safe_ptr<core::audio_buffer>& audio_samples)\r
-       {\r
-               if(audio_samples == eof_audio() || audio_samples == empty_audio())\r
-               {\r
-                       send(audio_, audio_samples);\r
-                       return;\r
-               }\r
-\r
-               if(audio_samples == loop_audio())                       \r
-                       return;         \r
-\r
-               try\r
-               {               \r
-                       if(!is_running_.value())\r
-                               return;\r
-\r
-                       audio_data_.insert(audio_data_.end(), audio_samples->begin(), audio_samples->end());\r
-               \r
-                       while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
-                       {\r
-                               auto begin = audio_data_.begin(); \r
-                               auto end   = begin + format_desc_.audio_samples_per_frame;\r
-                                       \r
-                               send(audio_, make_safe<core::audio_buffer>(begin, end));\r
-\r
-                               audio_data_.erase(begin, end);\r
-                       }\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       send(is_running_ , false);\r
-                       send(audio_, eof_audio());\r
-               }\r
-       }\r
 \r
        void initialize_display_mode(AVFrame& frame)\r
        {\r
index 46ed171aba1710e729d127993f26bc01f997e815..a2940c6974d8f2c28ba661927f2475ae08dd1f5f 100644 (file)
@@ -3,6 +3,7 @@
 #include "util.h"\r
 \r
 #include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
 \r
 #include <core/mixer/audio/audio_mixer.h>\r
 \r
@@ -30,9 +31,13 @@ class frame_muxer2 : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ISource<safe_ptr<AVFrame>>                         video_source_t;\r
-       typedef Concurrency::ISource<safe_ptr<core::audio_buffer>>      audio_source_t;\r
-       typedef Concurrency::ITarget<safe_ptr<core::basic_frame>>       target_t;\r
+       typedef std::pair<safe_ptr<AVFrame>, ticket_t>                          video_source_element_t;\r
+       typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t>       audio_source_element_t;\r
+       typedef std::pair<safe_ptr<core::basic_frame>, ticket_t>        target_element_t;\r
+\r
+       typedef Concurrency::ISource<video_source_element_t>            video_source_t;\r
+       typedef Concurrency::ISource<audio_source_element_t>            audio_source_t;\r
+       typedef Concurrency::ITarget<target_element_t>                          target_t;\r
                                                                 \r
        frame_muxer2(video_source_t* video_source,\r
                                 audio_source_t* audio_source, \r
index 534abfc548c41d549cf1678c48107c0c174f4774..11847fa90b1476a103da67a0a884b7d302a30c3b 100644 (file)
@@ -52,6 +52,8 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
+\r
+#undef Yield\r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
@@ -79,7 +81,8 @@ struct input::implementation : public Concurrency::agent, boost::noncopyable
        tbb::atomic<size_t>                                             packets_count_;\r
        tbb::atomic<size_t>                                             packets_size_;\r
 \r
-       bool                                                                    stop_;\r
+       overwrite_buffer<bool>                                  is_running_;\r
+       governor                                                                governor_;\r
                \r
 public:\r
        explicit implementation(input::target_t& target,\r
@@ -98,24 +101,31 @@ public:
                , start_(start)\r
                , length_(length)\r
                , frame_number_(0)\r
-               , stop_(false)\r
+               , governor_(4)\r
        {               \r
                packets_count_  = 0;\r
                packets_size_   = 0;\r
                nb_frames_              = 0;\r
                nb_loops_               = 0;\r
                                \r
-               av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
+               //av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
                                \r
                if(start_ > 0)                  \r
                        seek_frame(start_);\r
                                                                \r
                graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
        }\r
+\r
+       ~implementation()\r
+       {\r
+               if(is_running_.value())\r
+                       stop();\r
+       }\r
        \r
        void stop()\r
        {\r
-               stop_ = true;\r
+               send(is_running_, false);\r
+               governor_.cancel();\r
                agent::wait(this);\r
        }\r
        \r
@@ -123,14 +133,19 @@ public:
        {\r
                try\r
                {\r
-                       while(!stop_)\r
+                       send(is_running_, true);\r
+                       while(is_running_.value())\r
                        {\r
                                auto packet = read_next_packet();\r
                                if(!packet)\r
                                        break;\r
+                               \r
+                               if(packet->stream_index != default_stream_index_)\r
+                                       Concurrency::asend(target_, input::target_element_t(packet, governor_.acquire()));\r
+                               else\r
+                                       Concurrency::asend(target_, input::target_element_t(packet, ticket_t()));\r
 \r
-                               Concurrency::asend(target_, make_safe_ptr(packet));\r
-                               Concurrency::wait(20);\r
+                               Context::Yield();\r
                        }\r
                }\r
                catch(...)\r
@@ -139,7 +154,7 @@ public:
                }       \r
        \r
                BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::send(target_, eof_packet(stream->index));  \r
+                       Concurrency::send(target_, input::target_element_t(eof_packet(stream->index), ticket_t()));     \r
 \r
                done();\r
        }\r
@@ -218,7 +233,7 @@ public:
                packet->size = 0;\r
 \r
                BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::asend(target_, loop_packet(stream->index));        \r
+                       Concurrency::asend(target_, input::target_element_t(loop_packet(stream->index), ticket_t()));   \r
 \r
                graph_->add_tag("seek");                \r
        }               \r
index 7db09bd525bfc6b07b2a1d7864a7dbab54b79936..30c6743673261bb6bb97f2a64d009d9b56d01f91 100644 (file)
@@ -22,6 +22,7 @@
 #include "util.h"\r
 \r
 #include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
 \r
 #include <agents.h>\r
 #include <concrt.h>\r
@@ -49,7 +50,9 @@ class input : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ITarget<safe_ptr<AVPacket>> target_t;\r
+       typedef std::pair<safe_ptr<AVPacket>, ticket_t> target_element_t;\r
+\r
+       typedef Concurrency::ITarget<target_element_t> target_t;\r
 \r
        explicit input(target_t& target, \r
                                   const safe_ptr<diagnostics::graph>& graph, \r
index 9401421aacd49fa185852c52c8a78c04e51aa775..13e6f0cc6dc7684ec4cb3b960ff8b4bbd2d49595 100644 (file)
@@ -44,6 +44,7 @@ extern "C"
 \r
 #include <tbb/scalable_allocator.h>\r
 \r
+#undef Yield\r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
@@ -60,9 +61,8 @@ struct video_decoder::implementation : public Concurrency::agent, boost::noncopy
        size_t                                                                  height_;\r
        bool                                                                    is_progressive_;\r
        \r
-       overwrite_buffer<bool>                                  is_running_;\r
-       unbounded_buffer<safe_ptr<AVPacket>>    source_;\r
-       ITarget<safe_ptr<AVFrame>>&                             target_;\r
+       unbounded_buffer<video_decoder::source_element_t>       source_;\r
+       ITarget<video_decoder::target_element_t>&                       target_;\r
        \r
 public:\r
        explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
@@ -72,10 +72,7 @@ public:
                , width_(codec_context_->width)\r
                , height_(codec_context_->height)\r
                , is_progressive_(true)\r
-               , source_([this](const safe_ptr<AVPacket>& packet)\r
-                       {\r
-                               return packet->stream_index == index_;\r
-                       })\r
+               , source_([this](const video_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
                , target_(target)\r
        {               \r
                CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
@@ -90,7 +87,6 @@ public:
 \r
        ~implementation()\r
        {\r
-               send(is_running_, false);\r
                agent::wait(this);\r
        }\r
 \r
@@ -98,14 +94,14 @@ public:
        {\r
                try\r
                {\r
-                       send(is_running_, true);\r
-                       while(is_running_.value())\r
+                       while(true)\r
                        {\r
-                               auto packet = receive(source_);\r
+                               auto element = receive(source_);\r
+                               auto packet = element.first;\r
                        \r
                                if(packet == loop_packet(index_))\r
                                {\r
-                                       send(target_, loop_video());\r
+                                       send(target_, target_element_t(loop_video(), ticket_t()));\r
                                        continue;\r
                                }\r
 \r
@@ -132,8 +128,8 @@ public:
                                \r
                                // C-TODO: Avoid duplication.\r
                                // Need to dupliace frame data since avcodec_decode_video2 reuses it.\r
-                               send(target_, dup_frame(make_safe_ptr(decoded_frame)));\r
-                               Concurrency::wait(10);\r
+                               send(target_, target_element_t(dup_frame(make_safe_ptr(decoded_frame)), element.second));                               \r
+                               Context::Yield();\r
                        }\r
                }\r
                catch(...)\r
@@ -141,8 +137,7 @@ public:
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
                \r
-               send(is_running_, false),\r
-               send(target_, eof_video());\r
+               send(target_, target_element_t(eof_video(), ticket_t()));\r
 \r
                done();\r
        }\r
index a6107c1c61594c6a06fbc543496bc559219781cd..4bf30dac7e90bb5b728340ae49510cba2da8c6b9 100644 (file)
@@ -22,6 +22,7 @@
 #include "../util.h"\r
 \r
 #include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
 \r
 #include <core/video_format.h>\r
 \r
@@ -47,8 +48,11 @@ class video_decoder : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
-       typedef Concurrency::ITarget<safe_ptr<AVFrame>>  target_t;\r
+       typedef std::pair<safe_ptr<AVPacket>, ticket_t> source_element_t;\r
+       typedef std::pair<safe_ptr<AVFrame>, ticket_t> target_element_t;\r
+\r
+       typedef Concurrency::ISource<source_element_t>  source_t;\r
+       typedef Concurrency::ITarget<target_element_t>  target_t;\r
        \r
        explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context);   \r
 \r