]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 22 Oct 2011 12:37:24 +0000 (12:37 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 22 Oct 2011 12:37:24 +0000 (12:37 +0000)
16 files changed:
core/mixer/gpu/ogl_device.cpp
core/mixer/gpu/ogl_device.h
modules/decklink/producer/decklink_producer.cpp
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/audio/audio_decoder.h
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/ffmpeg/producer/frame_muxer.cpp
modules/ffmpeg/producer/frame_muxer.h
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/input.h
modules/ffmpeg/producer/util.cpp
modules/ffmpeg/producer/util.h
modules/ffmpeg/producer/video/video_decoder.cpp
modules/ffmpeg/producer/video/video_decoder.h
modules/flash/producer/flash_producer.cpp
shell/casparcg.config

index a94f154207790a79602f3b60358f4f714121374c..bcdbde7be63c9b07bad70c74517a5581dfc401e1 100644 (file)
@@ -36,7 +36,7 @@
 namespace caspar { namespace core {\r
 \r
 ogl_device::ogl_device() \r
-       : executor_(L"ogl_device")\r
+       : executor_(new executor(L"ogl_device"))\r
        , pattern_(nullptr)\r
        , attached_texture_(0)\r
        , active_shader_(0)\r
@@ -109,7 +109,7 @@ safe_ptr<device_buffer> ogl_device::create_device_buffer(size_t width, size_t he
        auto& pool = device_pools_[stride-1][((width << 16) & 0xFFFF0000) | (height & 0x0000FFFF)];\r
        std::shared_ptr<device_buffer> buffer;\r
        if(!pool->items.try_pop(buffer))                \r
-               buffer = executor_.invoke([&]{return allocate_device_buffer(width, height, stride);}, high_priority);                   \r
+               buffer = executor_->invoke([&]{return allocate_device_buffer(width, height, stride);}, high_priority);                  \r
        \r
        //++pool->usage_count;\r
 \r
@@ -162,13 +162,14 @@ safe_ptr<host_buffer> ogl_device::create_host_buffer(size_t size, host_buffer::u
        auto& pool = host_pools_[usage][size];\r
        std::shared_ptr<host_buffer> buffer;\r
        if(!pool->items.try_pop(buffer))        \r
-               buffer = executor_.invoke([=]{return allocate_host_buffer(size, usage);}, high_priority);       \r
+               buffer = executor_->invoke([=]{return allocate_host_buffer(size, usage);}, high_priority);      \r
        \r
        //++pool->usage_count;\r
 \r
+       auto exe = executor_;\r
        return safe_ptr<host_buffer>(buffer.get(), [=](host_buffer*) mutable\r
        {\r
-               executor_.begin_invoke([=]() mutable\r
+               exe->begin_invoke([=]() mutable\r
                {               \r
                        if(usage == host_buffer::write_only)\r
                                buffer->map();\r
@@ -221,7 +222,7 @@ void ogl_device::flush()
 \r
 void ogl_device::yield()\r
 {\r
-       executor_.yield();\r
+       executor_->yield();\r
 }\r
 \r
 boost::unique_future<void> ogl_device::gc()\r
index 872046c0b3b46072f26ef5cd782750622af8c3f2..1502af51cbf9e4998657cf5b1c1179c6ae649cca 100644 (file)
@@ -74,7 +74,7 @@ class ogl_device : boost::noncopyable
        \r
        unsigned int fbo_;\r
 \r
-       executor executor_;\r
+       safe_ptr<executor> executor_;\r
                                \r
 public:                \r
        ogl_device();\r
@@ -104,13 +104,13 @@ public:
        template<typename Func>\r
        auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
        {                       \r
-               return executor_.begin_invoke(std::forward<Func>(func), priority);\r
+               return executor_->begin_invoke(std::forward<Func>(func), priority);\r
        }\r
        \r
        template<typename Func>\r
        auto invoke(Func&& func, task_priority priority = normal_priority) -> decltype(func())\r
        {\r
-               return executor_.invoke(std::forward<Func>(func), priority);\r
+               return executor_->invoke(std::forward<Func>(func), priority);\r
        }\r
                \r
        safe_ptr<device_buffer> create_device_buffer(size_t width, size_t height, size_t stride);\r
index 6f4a08d5350150b79ced9ad74b44d3452b9fff1c..f4e1d38531b5ac5f2c141f9dde0c656705643bcd 100644 (file)
@@ -172,9 +172,9 @@ public:
        \r
 class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
 {              \r
-       Concurrency::bounded_buffer<std::shared_ptr<AVFrame>>                           video_frames_;\r
-       Concurrency::bounded_buffer<std::shared_ptr<core::audio_buffer>>        audio_buffers_;\r
-       Concurrency::bounded_buffer<safe_ptr<core::basic_frame>>                        muxed_frames_;\r
+       Concurrency::bounded_buffer<ffmpeg::video_message_t>    video_frames_;\r
+       Concurrency::bounded_buffer<ffmpeg::audio_message_t>    audio_buffers_;\r
+       Concurrency::bounded_buffer<ffmpeg::frame_message_t>    muxed_frames_;\r
 \r
        const core::video_format_desc           format_desc_;\r
        const size_t                                            device_index_;\r
@@ -188,6 +188,8 @@ class decklink_producer_proxy : public Concurrency::agent, public core::frame_pr
 \r
        mutable Concurrency::single_assignment<std::wstring> print_;\r
 \r
+       safe_ptr<Concurrency::semaphore> semaphore_;\r
+\r
        volatile bool is_running_;\r
 public:\r
 \r
@@ -202,6 +204,7 @@ public:
                , filter_(filter_str)\r
                , muxer_(&video_frames_, &audio_buffers_, muxed_frames_, ffmpeg::double_rate(filter_str) ? format_desc.fps * 2.0 : format_desc.fps, frame_factory)\r
                , is_running_(true)\r
+               , semaphore_(make_safe<Concurrency::semaphore>(3))\r
        {\r
                agent::start();\r
        }\r
@@ -218,7 +221,8 @@ public:
 \r
                try\r
                {\r
-                       last_frame_ = frame = safe_ptr<core::basic_frame>(Concurrency::receive(muxed_frames_));\r
+                       auto message = Concurrency::receive(muxed_frames_);\r
+                       last_frame_ = frame = make_safe_ptr(message->payload);\r
                }\r
                catch(Concurrency::operation_timed_out&)\r
                {               \r
@@ -294,7 +298,7 @@ public:
                                                auto frame = filter_.poll();\r
                                                if(!frame)\r
                                                        break;\r
-                                               Concurrency::send(video_frames_, frame);\r
+                                               Concurrency::send(video_frames_, ffmpeg::make_message(frame, std::make_shared<ffmpeg::token>(semaphore_)));\r
                                        }\r
                                },\r
                                [&]\r
@@ -304,10 +308,10 @@ public:
                                        {\r
                                                auto sample_frame_count = audio->GetSampleFrameCount();\r
                                                auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
-                                               Concurrency::send(audio_buffers_, std::make_shared<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
+                                               Concurrency::send(audio_buffers_, ffmpeg::make_message(std::make_shared<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), std::make_shared<ffmpeg::token>(semaphore_)));\r
                                        }\r
                                        else\r
-                                               Concurrency::send(audio_buffers_, ffmpeg::empty_audio());       \r
+                                               Concurrency::send(audio_buffers_, ffmpeg::make_message(ffmpeg::empty_audio(), std::make_shared<ffmpeg::token>(semaphore_)));    \r
                                });\r
                        }\r
 \r
index c7347a500cbe1c2607245f785300cdc8fbc2c82c..6cd83611de45fd30f0db74eb89714bb4b10f5f2b 100644 (file)
@@ -56,7 +56,7 @@ struct audio_decoder::implementation : boost::noncopyable
        \r
        std::vector<int8_t,  tbb::cache_aligned_allocator<int8_t>>      buffer1_;\r
 \r
-       Concurrency::transformer<std::shared_ptr<AVPacket>, std::shared_ptr<core::audio_buffer>> transformer_;\r
+       Concurrency::transformer<packet_message_t, audio_message_t> transformer_;\r
        \r
 public:\r
        explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
@@ -65,9 +65,9 @@ public:
                                         format_desc.audio_sample_rate, codec_context_->sample_rate,\r
                                         AV_SAMPLE_FMT_S32,                             codec_context_->sample_fmt)\r
                , buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
-               , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const std::shared_ptr<AVPacket>& packet)\r
+               , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
                        {\r
-                               return packet && packet->stream_index == index_;\r
+                               return message->payload && message->payload->stream_index == index_;\r
                        })\r
        {                               \r
                CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
@@ -75,18 +75,20 @@ public:
                Concurrency::connect(source, transformer_);\r
        }\r
 \r
-       std::shared_ptr<core::audio_buffer> decode(const std::shared_ptr<AVPacket>& packet)\r
+       audio_message_t decode(const packet_message_t& message)\r
        {               \r
+               auto packet = message->payload;\r
+\r
                if(!packet)\r
-                       return nullptr;\r
+                       return make_message(std::shared_ptr<core::audio_buffer>());\r
 \r
                if(packet == loop_packet(index_))\r
-                       return loop_audio();\r
+                       return make_message(loop_audio());\r
 \r
                if(packet == eof_packet(index_))\r
-                       return eof_audio();\r
+                       return make_message(eof_audio());\r
 \r
-               auto result = make_safe<core::audio_buffer>();\r
+               auto result = std::make_shared<core::audio_buffer>();\r
 \r
                while(packet->size > 0)\r
                {\r
@@ -109,7 +111,7 @@ public:
                        result->insert(result->end(), samples, samples + n_samples);\r
                }\r
                                \r
-               return result;\r
+               return make_message(result, message->token);\r
        }\r
 };\r
 \r
index 7f02f562af1d7a4fde8b813eee44beeb6c4b553f..4cb89ce5d171de1b618f388ebe3f47ff005da9ab 100644 (file)
@@ -19,6 +19,8 @@
 */\r
 #pragma once\r
 \r
+#include "../util.h"\r
+\r
 #include <core/mixer/audio/audio_mixer.h>\r
 \r
 #include <common/memory/safe_ptr.h>\r
@@ -45,8 +47,8 @@ class audio_decoder : boost::noncopyable
 {\r
 public:\r
 \r
-       typedef Concurrency::ISource<std::shared_ptr<AVPacket>>& source_t;\r
-       typedef Concurrency::ITarget<std::shared_ptr<core::audio_buffer>>& target_t;\r
+       typedef Concurrency::ISource<packet_message_t>& source_t;\r
+       typedef Concurrency::ITarget<audio_message_t>& target_t;\r
        \r
        explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
        \r
index 956648131357df18790b056ffccf594adbc8a47d..d1b684ef5ab11fde07b0653a3ae1662ed3031c59 100644 (file)
@@ -62,18 +62,18 @@ struct ffmpeg_producer : public core::frame_producer
        const bool                                                              loop_;\r
        const size_t                                                    length_;\r
        \r
-       Concurrency::unbounded_buffer<std::shared_ptr<AVPacket>>                        packets_;\r
-       Concurrency::unbounded_buffer<std::shared_ptr<AVFrame>>                         video_;\r
-       Concurrency::unbounded_buffer<std::shared_ptr<core::audio_buffer>>      audio_;\r
-       Concurrency::bounded_buffer<safe_ptr<core::basic_frame>>                        frames_;\r
+       Concurrency::unbounded_buffer<packet_message_t> packets_;\r
+       Concurrency::unbounded_buffer<video_message_t>  video_;\r
+       Concurrency::unbounded_buffer<audio_message_t>  audio_;\r
+       Concurrency::bounded_buffer<frame_message_t>    frames_;\r
+       Concurrency::call<packet_message_t>                             throw_away_;\r
                \r
        const safe_ptr<diagnostics::graph>              graph_;\r
                                        \r
-       input                                                                                   input_; \r
-       std::shared_ptr<video_decoder>                                  video_decoder_;\r
-       std::shared_ptr<audio_decoder>                                  audio_decoder_; \r
-       Concurrency::call<std::shared_ptr<AVPacket>>    throw_away_;\r
-       std::unique_ptr<frame_muxer2>                                   muxer_;\r
+       input                                                                   input_; \r
+       std::shared_ptr<video_decoder>                  video_decoder_;\r
+       std::shared_ptr<audio_decoder>                  audio_decoder_; \r
+       std::unique_ptr<frame_muxer2>                   muxer_;\r
 \r
        safe_ptr<core::basic_frame>                             last_frame_;\r
        \r
@@ -83,7 +83,7 @@ public:
                , start_(start)\r
                , loop_(loop)\r
                , length_(length)\r
-               , throw_away_([](const std::shared_ptr<AVPacket>&){})\r
+               , throw_away_([](const packet_message_t&){})\r
                , frames_(2)\r
                , graph_(diagnostics::create_graph("", false))\r
                , input_(packets_, graph_, filename_, loop, start, length)\r
@@ -137,7 +137,7 @@ public:
        ~ffmpeg_producer()\r
        {\r
                input_.stop();                  \r
-               while(Concurrency::receive(frames_) != core::basic_frame::eof())\r
+               while(Concurrency::receive(frames_)->payload != core::basic_frame::eof())\r
                {\r
                }\r
        }\r
@@ -148,7 +148,8 @@ public:
                \r
                try\r
                {               \r
-                       frame = last_frame_ = safe_ptr<core::basic_frame>(Concurrency::receive(frames_, 10));\r
+                       auto message = Concurrency::receive(frames_, 10);\r
+                       frame = last_frame_ = make_safe_ptr(message->payload);\r
                        graph_->update_text(narrow(print()));\r
                }\r
                catch(Concurrency::operation_timed_out&)\r
index e014b6f87322c157936e49ffb00e53107111e054..3e487dfcd30abe21a589d93d9e5c7fa459426cae 100644 (file)
@@ -132,27 +132,29 @@ display_mode::type get_display_mode(const core::field_mode::type in_mode, double
 \r
 struct frame_muxer2::implementation : boost::noncopyable\r
 {              \r
+       typedef std::shared_ptr<message<core::write_frame>> write_frame_message_t;\r
+\r
        display_mode::type                                                                                                              display_mode_;\r
        const double                                                                                                                    in_fps_;\r
        const video_format_desc                                                                                                 format_desc_;\r
        bool                                                                                                                                    auto_transcode_;\r
        \r
        filter                                                                                                                                  filter_;\r
-       safe_ptr<core::frame_factory>                                                                                   frame_factory_;\r
+       const safe_ptr<core::frame_factory>                                                                             frame_factory_;\r
        \r
-       Concurrency::call<std::shared_ptr<AVFrame>>                                                             push_video_;\r
-       Concurrency::call<std::shared_ptr<core::audio_buffer>>                                  push_audio_;\r
+       Concurrency::call<video_message_t>                                                                              push_video_;\r
+       Concurrency::call<audio_message_t>                                                                              push_audio_;\r
        \r
-       Concurrency::transformer<safe_ptr<AVFrame>, std::shared_ptr<core::write_frame>> video_;\r
-       Concurrency::unbounded_buffer<std::shared_ptr<core::audio_buffer>>                              audio_;\r
+       Concurrency::transformer<video_message_t, write_frame_message_t>                video_;\r
+       Concurrency::unbounded_buffer<audio_message_t>                                                  audio_;\r
 \r
-       typedef std::tuple<std::shared_ptr<core::write_frame>, std::shared_ptr<core::audio_buffer>> join_element_t;\r
+       typedef std::tuple<write_frame_message_t, audio_message_t> join_element_t;\r
        \r
-       Concurrency::transformer<join_element_t, safe_ptr<core::basic_frame>>   merge_;\r
+       Concurrency::transformer<join_element_t, frame_message_t>                               merge_;\r
        safe_ptr<Concurrency::ISource<join_element_t>>                                                  join_;\r
 \r
        core::audio_buffer                                                                                                              audio_data_;\r
-       std::queue<safe_ptr<AVFrame>>                                                                                   video_frames_;\r
+       std::queue<video_message_t>                                                                                             video_frames_;\r
                                                        \r
        implementation(frame_muxer2::video_source_t* video_source,\r
                                   frame_muxer2::audio_source_t* audio_source,\r
@@ -164,7 +166,7 @@ struct frame_muxer2::implementation : boost::noncopyable
                , format_desc_(frame_factory->get_video_format_desc())\r
                , auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
                , frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
-               , video_(std::bind(&make_write_frame, this, std::placeholders::_1, frame_factory, 0))\r
+               , video_(std::bind(&implementation::make_write_frame, this, std::placeholders::_1))\r
                , push_video_(std::bind(&implementation::push_video, this, std::placeholders::_1))\r
                , push_audio_(std::bind(&implementation::push_audio, this, std::placeholders::_1))\r
                , merge_(std::bind(&implementation::merge, this, std::placeholders::_1), &target)\r
@@ -178,26 +180,44 @@ struct frame_muxer2::implementation : boost::noncopyable
                join_->link_target(&merge_);\r
        }\r
 \r
-       safe_ptr<core::basic_frame> merge(const join_element_t& element)\r
+       frame_message_t merge(const join_element_t& element)\r
        {\r
-               //if(std::get<0>(element) == eof_video() || std::get<1>(element) == eof_audio())\r
-               //      return core::basic_frame::eof();\r
-               auto frame = std::get<0>(element);\r
-               frame->audio_data() = std::move(*std::get<1>(element));\r
-               return make_safe_ptr(frame);\r
+               if(!std::get<0>(element)->payload || !std::get<1>(element)->payload)\r
+                       return make_message(std::shared_ptr<core::basic_frame>(core::basic_frame::eof()));\r
+\r
+               auto& frame = std::get<0>(element)->payload;\r
+               frame->audio_data() = std::move(*std::get<1>(element)->payload);\r
+               return make_message(std::shared_ptr<core::basic_frame>(frame), std::get<0>(element)->token);\r
+       }\r
+\r
+       write_frame_message_t make_write_frame(const video_message_t& message)\r
+       {\r
+               if(!message->payload)\r
+                       return make_message(std::shared_ptr<core::write_frame>(), message->token);\r
+\r
+               auto frame = ffmpeg::make_write_frame(this, make_safe_ptr(message->payload), frame_factory_, 0);\r
+               return make_message(std::shared_ptr<core::write_frame>(frame), message->token);\r
        }\r
        \r
-       void push_video(const std::shared_ptr<AVFrame>& video_frame)\r
+       void push_video(const video_message_t& message)\r
        {               \r
+               auto video_frame = message->payload;\r
+\r
                if(!video_frame)\r
                        return;\r
+               \r
+               if(video_frame == eof_video())\r
+               {\r
+                       Concurrency::send(video_, make_message(std::shared_ptr<AVFrame>()));\r
+                       return;\r
+               }\r
 \r
                if(video_frame == loop_video())         \r
                        return; \r
                                \r
                if(video_frame == empty_video())\r
                {\r
-                       Concurrency::send(video_, make_safe_ptr(empty_video()));\r
+                       Concurrency::send(video_, make_message(empty_video(), message->token));\r
                        return;\r
                }\r
                \r
@@ -245,7 +265,7 @@ struct frame_muxer2::implementation : boost::noncopyable
                {               \r
                        av_frame->format = format;\r
                        \r
-                       video_frames_.push(av_frame);\r
+                       video_frames_.push(make_message(std::shared_ptr<AVFrame>(av_frame), std::move(message->token)));\r
 \r
                        switch(display_mode_)\r
                        {\r
@@ -255,6 +275,7 @@ struct frame_muxer2::implementation : boost::noncopyable
                                {\r
                                        Concurrency::send(video_, video_frames_.front());\r
                                        video_frames_.pop();\r
+\r
                                        break;\r
                                }\r
                        case display_mode::duplicate:                                   \r
@@ -266,6 +287,7 @@ struct frame_muxer2::implementation : boost::noncopyable
                                        video_frames_.pop();\r
                                        Concurrency::send(video_, video_frames_.front());\r
                                        video_frames_.pop();\r
+\r
                                        break;\r
                                }\r
                        case display_mode::half:                                                \r
@@ -300,16 +322,24 @@ struct frame_muxer2::implementation : boost::noncopyable
                }\r
        }\r
 \r
-       void push_audio(const std::shared_ptr<core::audio_buffer>& audio_samples)\r
+       void push_audio(const audio_message_t& message)\r
        {\r
+               auto audio_samples = message->payload;\r
+\r
                if(!audio_samples)\r
                        return;\r
 \r
+               if(audio_samples == eof_audio())\r
+               {\r
+                       Concurrency::send(audio_, make_message(std::shared_ptr<core::audio_buffer>()));\r
+                       return;\r
+               }\r
+\r
                if(audio_samples == loop_audio())                       \r
                        return;         \r
 \r
                if(audio_samples == empty_audio())              \r
-                       Concurrency::send(audio_, std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0));               \r
+                       Concurrency::send(audio_, make_message(std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0), message->token));         \r
 \r
                audio_data_.insert(audio_data_.end(), audio_samples->begin(), audio_samples->end());\r
                \r
@@ -318,7 +348,7 @@ struct frame_muxer2::implementation : boost::noncopyable
                        auto begin = audio_data_.begin(); \r
                        auto end   = begin + format_desc_.audio_samples_per_frame;\r
                                        \r
-                       Concurrency::send(audio_, std::make_shared<core::audio_buffer>(begin, end));\r
+                       Concurrency::send(audio_, make_message(std::make_shared<core::audio_buffer>(begin, end), message->token));\r
                        audio_data_.erase(begin, end);\r
                }\r
        }\r
index 8650c0c0b65e7a346628573504f5a4e886101a2f..36f0bf4ba9ea37fc0e42a887608919b03006845f 100644 (file)
@@ -1,5 +1,7 @@
 #pragma once\r
 \r
+#include "util.h"\r
+\r
 #include <common/memory/safe_ptr.h>\r
 \r
 #include <core/mixer/audio/audio_mixer.h>\r
@@ -29,10 +31,10 @@ class frame_muxer2 : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ISource<std::shared_ptr<AVFrame>>                          video_source_t;\r
-       typedef Concurrency::ISource<std::shared_ptr<core::audio_buffer>>       audio_source_t;\r
-       typedef Concurrency::ITarget<safe_ptr<core::basic_frame>>                       target_t;\r
-\r
+       typedef Concurrency::ISource<video_message_t>   video_source_t;\r
+       typedef Concurrency::ISource<audio_message_t>   audio_source_t;\r
+       typedef Concurrency::ITarget<frame_message_t>   target_t;\r
+                                                                \r
        frame_muxer2(video_source_t* video_source,\r
                                 audio_source_t* audio_source, \r
                                 target_t& target,\r
index 20bd7df65f8e85923c712251b4e6698a11474c96..3dfe2035b2714b86e9975df32d81ee7a40762405 100644 (file)
@@ -57,7 +57,7 @@ using namespace Concurrency;
 \r
 namespace caspar { namespace ffmpeg {\r
 \r
-static const size_t MAX_BUFFER_COUNT = 32;\r
+static const size_t MAX_TOKENS = 32;\r
        \r
 struct input::implementation : public Concurrency::agent, boost::noncopyable\r
 {\r
@@ -78,8 +78,11 @@ struct input::implementation : public Concurrency::agent, boost::noncopyable
        tbb::atomic<size_t>                                             nb_frames_;\r
        tbb::atomic<size_t>                                             nb_loops_;      \r
        tbb::atomic<size_t>                                             packets_count_;\r
+       tbb::atomic<size_t>                                             packets_size_;\r
 \r
        bool                                                                    stop_;\r
+\r
+       safe_ptr<Concurrency::semaphore>                semaphore_;\r
        \r
 public:\r
        explicit implementation(input::target_t& target,\r
@@ -99,8 +102,10 @@ public:
                , length_(length)\r
                , frame_number_(0)\r
                , stop_(false)\r
+               , semaphore_(make_safe<Concurrency::semaphore>(MAX_TOKENS))\r
        {               \r
                packets_count_  = 0;\r
+               packets_size_   = 0;\r
                nb_frames_              = 0;\r
                nb_loops_               = 0;\r
                                \r
@@ -110,12 +115,13 @@ public:
                        seek_frame(start_);\r
                                                                \r
                graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
-               graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f));\r
-               graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
        }\r
-\r
-       ~implementation()\r
+       \r
+       void stop()\r
        {\r
+               stop_ = true;\r
+               for(size_t n = 0; n < format_context_->nb_streams+1; ++n)\r
+                       semaphore_->release();\r
                agent::wait(this);\r
        }\r
        \r
@@ -123,8 +129,37 @@ public:
        {\r
                try\r
                {\r
-                       while(!stop_ && read_next_packet())\r
+                       while(!stop_)\r
                        {\r
+                               auto packet = read_next_packet();\r
+                               if(!packet)\r
+                                       break;\r
+\r
+                               Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
+                               Concurrency::wait(0);\r
+\r
+                               //std::vector<std::shared_ptr<AVPacket>> buffer;\r
+\r
+                               //while(buffer.size() < 100 && !stop_)\r
+                               //{\r
+                               //      Concurrency::scoped_oversubcription_token oversubscribe;\r
+                               //      auto packet = read_next_packet();\r
+                               //      if(!packet)\r
+                               //              stop_ = true;\r
+                               //      else\r
+                               //              buffer.push_back(packet);\r
+                               //}\r
+                               //                              \r
+                               //std::stable_partition(buffer.begin(), buffer.end(), [this](const std::shared_ptr<AVPacket>& packet)\r
+                               //{\r
+                               //      return packet->stream_index != default_stream_index_;\r
+                               //});\r
+\r
+                               //BOOST_FOREACH(auto packet, buffer)\r
+                               //{\r
+                               //      Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
+                               //      Concurrency::wait(0);\r
+                               //}\r
                        }\r
                }\r
                catch(...)\r
@@ -133,12 +168,15 @@ public:
                }       \r
        \r
                BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::send(target_, eof_packet(stream->index));  \r
-                                                       \r
+               {\r
+                       Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_)));       \r
+                       Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_)));       \r
+               }\r
+\r
                done();\r
        }\r
 \r
-       bool read_next_packet()\r
+       std::shared_ptr<AVPacket> read_next_packet()\r
        {               \r
                auto packet = create_packet();\r
                \r
@@ -161,42 +199,35 @@ public:
                        else\r
                        {\r
                                CASPAR_LOG(trace) << print() << " Stopping.";\r
-                               return false;\r
+                               return nullptr;\r
                        }\r
                }\r
-               else\r
-               {               \r
-                       THROW_ON_ERROR(ret, print(), "av_read_frame");\r
-\r
-                       if(packet->stream_index == default_stream_index_)\r
-                       {\r
-                               if(nb_loops_ == 0)\r
-                                       ++nb_frames_;\r
-                               ++frame_number_;\r
-                       }\r
 \r
-                       THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
-                               \r
-                       // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
-                       auto size = packet->size;\r
-                       auto data = packet->data;                       \r
+               THROW_ON_ERROR(ret, print(), "av_read_frame");\r
 \r
-                       packet = std::shared_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
-                       {\r
-                               packet->size = size;\r
-                               packet->data = data;\r
-                               --packets_count_;\r
-                               graph_->update_value("buffer-count", (static_cast<double>(packets_count_)+0.001)/MAX_BUFFER_COUNT);     \r
-                       });\r
+               if(packet->stream_index == default_stream_index_)\r
+               {\r
+                       if(nb_loops_ == 0)\r
+                               ++nb_frames_;\r
+                       ++frame_number_;\r
+               }\r
 \r
-                       ++packets_count_;\r
+               THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
+                               \r
+               // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+               auto size = packet->size;\r
+               auto data = packet->data;                       \r
 \r
-                       Concurrency::asend(target_, packet);\r
-                                       \r
-                       graph_->update_value("buffer-count", (static_cast<double>(packets_count_)+0.001)/MAX_BUFFER_COUNT);\r
-               }       \r
+               packet = std::shared_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+               {\r
+                       packet->size = size;\r
+                       packet->data = data;\r
+                       --packets_count_;\r
+               });\r
 \r
-               return true;\r
+               ++packets_count_;\r
+                                                       \r
+               return packet;\r
        }\r
 \r
        void seek_frame(int64_t frame, int flags = 0)\r
@@ -215,10 +246,10 @@ public:
 \r
                THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print());    \r
                auto packet = create_packet();\r
-               packet->size = 0;               \r
+               packet->size = 0;\r
 \r
                BOOST_FOREACH(auto stream, streams_)\r
-                       Concurrency::send(target_, loop_packet(stream->index)); \r
+                       Concurrency::send(target_, make_message(loop_packet(stream->index), std::make_shared<token>(semaphore_)));      \r
 \r
                graph_->add_tag("seek");                \r
        }               \r
@@ -273,7 +304,7 @@ void input::start()
 \r
 void input::stop()\r
 {\r
-       impl_->stop_ = true;\r
+       impl_->stop();\r
 }\r
 \r
 }}
\ No newline at end of file
index 9259bf77087e9da8b009722ef6c62dcf3fd525e8..ce704c8000f7d74b1197cb38a0bf1fa15fca6f41 100644 (file)
@@ -19,6 +19,8 @@
 */\r
 #pragma once\r
 \r
+#include "util.h"\r
+\r
 #include <common/memory/safe_ptr.h>\r
 \r
 #include <agents.h>\r
@@ -47,7 +49,7 @@ class input : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
+       typedef Concurrency::ITarget<packet_message_t> target_t;\r
 \r
        explicit input(target_t& target, \r
                                   const safe_ptr<diagnostics::graph>& graph, \r
index e9687f5b07d42f95be0e4000f0ce5402faa42747..15d656c8bed641f095662d1a479f734efafa6cbf 100644 (file)
@@ -311,10 +311,10 @@ std::shared_ptr<AVPacket> create_packet()
 const std::shared_ptr<AVPacket>& loop_packet(int index)\r
 {\r
        static Concurrency::critical_section mutex;\r
-       static std::map<int, std::shared_ptr<AVPacket>> packets;\r
-\r
        Concurrency::critical_section::scoped_lock lock(mutex);\r
 \r
+       static std::map<int, std::shared_ptr<AVPacket>> packets;\r
+       \r
        auto& packet = packets[index];\r
        if(!packet)\r
        {\r
@@ -328,10 +328,10 @@ const std::shared_ptr<AVPacket>& loop_packet(int index)
 const std::shared_ptr<AVPacket>& eof_packet(int index)\r
 {\r
        static Concurrency::critical_section mutex;\r
-       static std::map<int, std::shared_ptr<AVPacket>> packets;\r
-\r
        Concurrency::critical_section::scoped_lock lock(mutex);\r
 \r
+       static std::map<int, std::shared_ptr<AVPacket>> packets;\r
+       \r
        auto& packet = packets[index];\r
        if(!packet)\r
        {\r
index 400361f583fc46a4c53e63d03630651290e24555..c7d585089f900f821de29b71d9244984c641967f 100644 (file)
@@ -21,6 +21,7 @@ extern "C"
 #endif\r
 \r
 #include <agents.h>\r
+#include <semaphore.h>\r
 \r
 struct AVFrame;\r
 struct AVFormatContext;\r
@@ -38,6 +39,46 @@ struct frame_factory;
 \r
 namespace ffmpeg {\r
        \r
+class token\r
+{\r
+       safe_ptr<Concurrency::semaphore> semaphore_;\r
+public:\r
+       token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
+               : semaphore_(semaphore)\r
+       {\r
+               semaphore_->acquire();\r
+       }\r
+\r
+       ~token()\r
+       {\r
+               semaphore_->release();\r
+       }\r
+};\r
+\r
+template <typename T>\r
+struct message\r
+{\r
+       message(const std::shared_ptr<T>& payload, const std::shared_ptr<token>& token = nullptr)\r
+               : payload(payload)\r
+               , token(token)\r
+       {\r
+       }\r
+\r
+       std::shared_ptr<T>         payload;\r
+       std::shared_ptr<token> token;\r
+};\r
+\r
+template<typename T>\r
+std::shared_ptr<message<T>> make_message(const std::shared_ptr<T>& payload, const std::shared_ptr<token>& token = nullptr)\r
+{\r
+       return std::make_shared<message<T>>(payload, token);\r
+}\r
+\r
+typedef std::shared_ptr<message<AVPacket>>                             packet_message_t;\r
+typedef std::shared_ptr<message<AVFrame>>                              video_message_t;\r
+typedef std::shared_ptr<message<core::audio_buffer>>   audio_message_t;\r
+typedef std::shared_ptr<message<core::basic_frame>>            frame_message_t;\r
+       \r
 static const PixelFormat       CASPAR_PIX_FMT_LUMA = PIX_FMT_MONOBLACK; // Just hijack some unual pixel format.\r
 \r
 core::field_mode::type         get_mode(AVFrame& frame);\r
index 6f2aed065f5e35ead08449d826dddb608963541e..e840a6d427c443081ba949116d05a45edf4ae059 100644 (file)
@@ -58,9 +58,9 @@ struct video_decoder::implementation : boost::noncopyable
        size_t                                                                  height_;\r
        bool                                                                    is_progressive_;\r
        \r
-       Concurrency::transformer<std::shared_ptr<AVPacket>, std::shared_ptr<AVFrame>> transformer_;\r
+       Concurrency::transformer<packet_message_t, video_message_t> transformer_;\r
        \r
-       Concurrency::semaphore semaphore_;\r
+       safe_ptr<Concurrency::semaphore> semaphore_;\r
 \r
 public:\r
        explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
@@ -70,11 +70,11 @@ public:
                , width_(codec_context_->width)\r
                , height_(codec_context_->height)\r
                , is_progressive_(true)\r
-               , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const std::shared_ptr<AVPacket>& packet)\r
+               , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
                        {\r
-                               return packet && packet->stream_index == index_;\r
+                               return message->payload && message->payload->stream_index == index_;\r
                        })\r
-               , semaphore_(1)\r
+               , semaphore_(make_safe<Concurrency::semaphore>(1))\r
        {               \r
                CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
                \r
@@ -84,23 +84,24 @@ public:
                Concurrency::connect(source, transformer_);\r
        }\r
                \r
-       std::shared_ptr<AVFrame> decode(const std::shared_ptr<AVPacket>& packet)\r
+       video_message_t decode(const packet_message_t& message)\r
        {\r
+               auto packet = message->payload;\r
+\r
                if(!packet)\r
-                       return nullptr;\r
+                       return make_message(std::shared_ptr<AVFrame>());\r
 \r
                if(packet == loop_packet(index_))\r
-                       return loop_video();\r
+                       return make_message(loop_video());\r
 \r
                if(packet == eof_packet(index_))\r
-                       return eof_video();\r
+                       return make_message(eof_video());\r
                \r
-               std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this](AVFrame* frame)\r
+               token token(semaphore_);\r
+               std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
                {\r
                        av_free(frame);\r
-                       semaphore_.release();\r
                });\r
-               semaphore_.acquire();\r
 \r
                int frame_finished = 0;\r
                THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
@@ -111,14 +112,15 @@ public:
                // AVParser or demuxer which puted more then one frame in a AVPacket.\r
 \r
                if(frame_finished == 0) \r
-                       return nullptr;\r
+                       return make_message(std::shared_ptr<AVFrame>());\r
 \r
                if(decoded_frame->repeat_pict > 0)\r
                        CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
                \r
                is_progressive_ = decoded_frame->interlaced_frame == 0;\r
                                \r
-               return decoded_frame;\r
+               Concurrency::wait(10);\r
+               return make_message(decoded_frame, message->token);\r
        }\r
                \r
        double fps() const\r
index a0be752553084dd2d761451bba7d5189ebb4aa9d..eefa91d4ff82d2372823dc1143d164c231680942 100644 (file)
@@ -19,6 +19,8 @@
 */\r
 #pragma once\r
 \r
+#include "../util.h"\r
+\r
 #include <common/memory/safe_ptr.h>\r
 \r
 #include <core/video_format.h>\r
@@ -45,8 +47,8 @@ class video_decoder : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ISource<std::shared_ptr<AVPacket>> source_t;\r
-       typedef Concurrency::ITarget<std::shared_ptr<AVFrame>>  target_t;\r
+       typedef Concurrency::ISource<packet_message_t> source_t;\r
+       typedef Concurrency::ITarget<video_message_t>  target_t;\r
        \r
        explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context);   \r
 \r
index ad13a840ea90e23ff173cacf9029f1c40500a3fa..46c83700e7c29b17df3ef1e4175c5efdb5e2d296 100644 (file)
@@ -147,9 +147,9 @@ class flash_renderer
        \r
 public:\r
        flash_renderer(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height) \r
-               : graph_(graph)\r
-               , filename_(filename)\r
+               : filename_(filename)\r
                , frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
+               , graph_(graph)\r
                , ax_(nullptr)\r
                , head_(core::basic_frame::empty())\r
                , bmp_(width, height)\r
@@ -200,11 +200,6 @@ public:
                CASPAR_LOG(info) << print() << L" Thread ended.";\r
        }\r
        \r
-       void make_write_frame(const std::shared_ptr<bitmap>& bmp)\r
-       {\r
-\r
-       }\r
-\r
        void param(const std::wstring& param)\r
        {               \r
                if(!ax_->FlashCall(param))\r
@@ -214,7 +209,7 @@ public:
        \r
        safe_ptr<core::basic_frame> render_frame(bool has_underflow)\r
        {\r
-               float frame_time = 1.0f/ax_->GetFPS();\r
+               const float frame_time = 1.0f/ax_->GetFPS();\r
 \r
                graph_->update_value("tick-time", static_cast<float>(tick_timer_.elapsed()/frame_time)*0.5f);\r
                tick_timer_.restart();\r
@@ -318,7 +313,7 @@ public:
                                ~co_init() {CoUninitialize();}\r
                        } init;\r
 \r
-                       flash_renderer renderer(safe_ptr<diagnostics::graph>(graph_), frame_factory_, filename_, width_, height_);\r
+                       flash_renderer renderer(graph_, frame_factory_, filename_, width_, height_);\r
 \r
                        is_running_ = true;\r
                        while(is_running_)\r
index 28f1006635ad693c797d2c3c362ba3771346c581..9f02867165e0096d5c33a7b9547cbf915bf0ded6 100644 (file)
@@ -43,7 +43,7 @@
     </producers>\r
     <channels>\r
       <channel>\r
-        <video-mode>1080p5000</video-mode>\r
+        <video-mode>720p5000</video-mode>\r
         <consumers>\r
           <decklink>\r
             <device>1</device>\r