]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 22 Oct 2011 14:40:28 +0000 (14:40 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 22 Oct 2011 14:40:28 +0000 (14:40 +0000)
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/frame_muxer.cpp
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/video/video_decoder.cpp

index 6cd83611de45fd30f0db74eb89714bb4b10f5f2b..13ba7a2b0c94e426ff0d58c8d1a0ee77b85487a9 100644 (file)
@@ -45,9 +45,11 @@ extern "C"
 #include <connect.h>\r
 #include <semaphore.h>\r
 \r
+using namespace Concurrency;\r
+\r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct audio_decoder::implementation : boost::noncopyable\r
+struct audio_decoder::implementation : public agent, boost::noncopyable\r
 {      \r
        int                                                                                                                     index_;\r
        std::shared_ptr<AVCodecContext>                                                         codec_context_;         \r
@@ -56,7 +58,9 @@ struct audio_decoder::implementation : boost::noncopyable
        \r
        std::vector<int8_t,  tbb::cache_aligned_allocator<int8_t>>      buffer1_;\r
 \r
-       Concurrency::transformer<packet_message_t, audio_message_t> transformer_;\r
+       overwrite_buffer<bool>                          is_running_;\r
+       unbounded_buffer<packet_message_t>      source_;\r
+       ITarget<audio_message_t>&                       target_;\r
        \r
 public:\r
        explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
@@ -65,53 +69,80 @@ public:
                                         format_desc.audio_sample_rate, codec_context_->sample_rate,\r
                                         AV_SAMPLE_FMT_S32,                             codec_context_->sample_fmt)\r
                , buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
-               , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
+               , source_([this](const packet_message_t& message)\r
                        {\r
                                return message->payload && message->payload->stream_index == index_;\r
                        })\r
+               , target_(target)\r
        {                               \r
                CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
 \r
-               Concurrency::connect(source, transformer_);\r
+               Concurrency::connect(source, source_);\r
+\r
+               start();\r
        }\r
 \r
-       audio_message_t decode(const packet_message_t& message)\r
-       {               \r
-               auto packet = message->payload;\r
+       ~implementation()\r
+       {\r
+               send(is_running_, false);\r
+               agent::wait(this);\r
+       }\r
 \r
-               if(!packet)\r
-                       return make_message(std::shared_ptr<core::audio_buffer>());\r
+       virtual void run()\r
+       {\r
+               try\r
+               {\r
+                       send(is_running_, true);\r
+                       while(is_running_.value())\r
+                       {                               \r
+                               auto message = receive(source_);\r
+                               auto packet = message->payload;\r
+                       \r
+                               if(!packet)\r
+                                       continue;\r
 \r
-               if(packet == loop_packet(index_))\r
-                       return make_message(loop_audio());\r
+                               if(packet == loop_packet(index_))\r
+                               {\r
+                                       send(target_, make_message(loop_audio()));\r
+                                       break;\r
+                               }\r
 \r
-               if(packet == eof_packet(index_))\r
-                       return make_message(eof_audio());\r
+                               if(packet == eof_packet(index_))\r
+                                       break;\r
 \r
-               auto result = std::make_shared<core::audio_buffer>();\r
+                               auto result = std::make_shared<core::audio_buffer>();\r
 \r
-               while(packet->size > 0)\r
-               {\r
-                       buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
-                       int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
+                               while(packet->size > 0)\r
+                               {\r
+                                       buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
+                                       int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
                \r
-                       int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
+                                       int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
 \r
-                       // There might be several frames in one packet.\r
-                       packet->size -= ret;\r
-                       packet->data += ret;\r
+                                       // There might be several frames in one packet.\r
+                                       packet->size -= ret;\r
+                                       packet->data += ret;\r
                        \r
-                       buffer1_.resize(written_bytes);\r
+                                       buffer1_.resize(written_bytes);\r
 \r
-                       buffer1_ = resampler_.resample(std::move(buffer1_));\r
+                                       buffer1_ = resampler_.resample(std::move(buffer1_));\r
                \r
-                       const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
-                       const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
+                                       const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
+                                       const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
 \r
-                       result->insert(result->end(), samples, samples + n_samples);\r
+                                       send(target_, make_message(std::make_shared<core::audio_buffer>(samples, samples + n_samples)));\r
+                               }\r
+                       }\r
+               }\r
+               catch(...)\r
+               {\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
-                               \r
-               return make_message(result, message->token);\r
+\r
+               send(is_running_, false);\r
+               send(target_, make_message(eof_audio()));\r
+\r
+               done();\r
        }\r
 };\r
 \r
index de6f2339f40866bedff37ea9792686eb46ff5604..38b4eb364836e0af6aa92eca5bbff1c165009bf4 100644 (file)
@@ -267,10 +267,10 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                }\r
                catch(...)\r
                {\r
-                       send(is_running_ , false);\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
-\r
+               \r
+               send(is_running_ , false);\r
                send(target_, make_message(core::basic_frame::eof()));\r
 \r
                done();\r
index 3dfe2035b2714b86e9975df32d81ee7a40762405..ee61c3d9424d05ec100d5c375b00f505e92f5080 100644 (file)
@@ -168,10 +168,7 @@ public:
                }       \r
        \r
                BOOST_FOREACH(auto stream, streams_)\r
-               {\r
-                       Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_)));       \r
                        Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_)));       \r
-               }\r
 \r
                done();\r
        }\r
index e840a6d427c443081ba949116d05a45edf4ae059..e69c25227eef0ef7012f05f5562614645ab79624 100644 (file)
@@ -44,9 +44,11 @@ extern "C"
 #include <connect.h>\r
 #include <semaphore.h>\r
 \r
+using namespace Concurrency;\r
+\r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct video_decoder::implementation : boost::noncopyable\r
+struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
 {      \r
        int                                                                             index_;\r
        std::shared_ptr<AVCodecContext>                 codec_context_;\r
@@ -58,9 +60,11 @@ struct video_decoder::implementation : boost::noncopyable
        size_t                                                                  height_;\r
        bool                                                                    is_progressive_;\r
        \r
-       Concurrency::transformer<packet_message_t, video_message_t> transformer_;\r
+       overwrite_buffer<bool>                                  is_running_;\r
+       unbounded_buffer<packet_message_t>              source_;\r
+       ITarget<video_message_t>&                               target_;\r
        \r
-       safe_ptr<Concurrency::semaphore> semaphore_;\r
+       safe_ptr<semaphore> semaphore_;\r
 \r
 public:\r
        explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
@@ -70,10 +74,11 @@ public:
                , width_(codec_context_->width)\r
                , height_(codec_context_->height)\r
                , is_progressive_(true)\r
-               , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
+               , source_([this](const packet_message_t& message)\r
                        {\r
                                return message->payload && message->payload->stream_index == index_;\r
                        })\r
+               , target_(target)\r
                , semaphore_(make_safe<Concurrency::semaphore>(1))\r
        {               \r
                CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
@@ -81,46 +86,74 @@ public:
                CASPAR_VERIFY(width_ > 0, ffmpeg_error());\r
                CASPAR_VERIFY(height_ > 0, ffmpeg_error());\r
 \r
-               Concurrency::connect(source, transformer_);\r
-       }\r
-               \r
-       video_message_t decode(const packet_message_t& message)\r
-       {\r
-               auto packet = message->payload;\r
+               Concurrency::connect(source, source_);\r
 \r
-               if(!packet)\r
-                       return make_message(std::shared_ptr<AVFrame>());\r
+               start();\r
+       }\r
 \r
-               if(packet == loop_packet(index_))\r
-                       return make_message(loop_video());\r
+       ~implementation()\r
+       {\r
+               send(is_running_, false);\r
+               agent::wait(this);\r
+       }\r
 \r
-               if(packet == eof_packet(index_))\r
-                       return make_message(eof_video());\r
-               \r
-               token token(semaphore_);\r
-               std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
+       virtual void run()\r
+       {\r
+               try\r
                {\r
-                       av_free(frame);\r
-               });\r
-\r
-               int frame_finished = 0;\r
-               THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
-\r
-               // 1 packet <=> 1 frame.\r
-               // If a decoder consumes less then the whole packet then something is wrong\r
-               // that might be just harmless padding at the end, or a problem with the\r
-               // AVParser or demuxer which puted more then one frame in a AVPacket.\r
-\r
-               if(frame_finished == 0) \r
-                       return make_message(std::shared_ptr<AVFrame>());\r
-\r
-               if(decoded_frame->repeat_pict > 0)\r
-                       CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
+                       send(is_running_, true);\r
+                       while(is_running_.value())\r
+                       {\r
+                               auto message = receive(source_);\r
+                               auto packet = message->payload;\r
+                       \r
+                               if(!packet)\r
+                                       continue;\r
+\r
+                               if(packet == loop_packet(index_))\r
+                               {\r
+                                       send(target_, make_message(loop_video()));\r
+                                       continue;\r
+                               }\r
+\r
+                               if(packet == eof_packet(index_))\r
+                                       break;\r
+\r
+                               token token(semaphore_);\r
+                               std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
+                               {\r
+                                       av_free(frame);\r
+                               });\r
+\r
+                               int frame_finished = 0;\r
+                               THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
+\r
+                               // 1 packet <=> 1 frame.\r
+                               // If a decoder consumes less then the whole packet then something is wrong\r
+                               // that might be just harmless padding at the end, or a problem with the\r
+                               // AVParser or demuxer which puted more then one frame in a AVPacket.\r
+\r
+                               if(frame_finished == 0) \r
+                                       continue;\r
+\r
+                               if(decoded_frame->repeat_pict > 0)\r
+                                       CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
                \r
-               is_progressive_ = decoded_frame->interlaced_frame == 0;\r
+                               is_progressive_ = decoded_frame->interlaced_frame == 0;\r
                                \r
-               Concurrency::wait(10);\r
-               return make_message(decoded_frame, message->token);\r
+                               send(target_, make_message(decoded_frame, message->token));\r
+                               Concurrency::wait(10);\r
+                       }\r
+               }\r
+               catch(...)\r
+               {\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+               }\r
+               \r
+               send(is_running_, false),\r
+               send(target_, make_message(eof_video()));\r
+\r
+               done();\r
        }\r
                \r
        double fps() const\r