]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sun, 23 Oct 2011 21:10:09 +0000 (21:10 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sun, 23 Oct 2011 21:10:09 +0000 (21:10 +0000)
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/audio/audio_decoder.h
modules/ffmpeg/producer/audio/audio_resampler.cpp
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/ffmpeg/producer/filter/filter.cpp
modules/ffmpeg/producer/frame_muxer.cpp
modules/ffmpeg/producer/frame_muxer.h
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/video/video_decoder.cpp
modules/ffmpeg/producer/video/video_decoder.h

index 784de026d6bc77566a2804d0de4d62caad9819d9..6d3f13cdfd75315a47181db486079651c00474b5 100644 (file)
@@ -43,14 +43,14 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <connect.h>\r
+#include <agents.h>\r
 #include <semaphore.h>\r
 \r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct audio_decoder::implementation : public agent, boost::noncopyable\r
+struct audio_decoder::implementation : boost::noncopyable\r
 {      \r
        int                                                                                                                     index_;\r
        std::shared_ptr<AVCodecContext>                                                         codec_context_;         \r
@@ -58,13 +58,10 @@ struct audio_decoder::implementation : public agent, boost::noncopyable
        audio_resampler                                                                                         resampler_;\r
        \r
        std::vector<int8_t,  tbb::cache_aligned_allocator<int8_t>>      buffer1_;\r
-\r
-       overwrite_buffer<bool>                                  is_running_;\r
-       unbounded_buffer<safe_ptr<AVPacket>>    source_;\r
-       ITarget<safe_ptr<core::audio_buffer>>&  target_;\r
-\r
-       safe_ptr<semaphore>                                             semaphore_;\r
        \r
+       safe_ptr<semaphore> semaphore_;\r
+\r
+       transformer<safe_ptr<AVPacket>, std::shared_ptr<core::audio_buffer>> transformer_;      \r
 public:\r
        explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
                : codec_context_(open_codec(context, AVMEDIA_TYPE_AUDIO, index_))\r
@@ -72,87 +69,70 @@ public:
                                         format_desc.audio_sample_rate, codec_context_->sample_rate,\r
                                         AV_SAMPLE_FMT_S32,                             codec_context_->sample_fmt)\r
                , buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
-               , source_([this](const safe_ptr<AVPacket>& packet){return packet->stream_index == index_;})\r
-               , target_(target)\r
                , semaphore_(make_safe<semaphore>(32))\r
-       {                               \r
-               CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
-\r
-               Concurrency::connect(source, source_);\r
-\r
-               start();\r
+               , transformer_([this](const safe_ptr<AVPacket>& packet){return (*this)(packet);}, &target,\r
+                                          [this](const safe_ptr<AVPacket>& packet){return packet->stream_index == index_;})\r
+       {               \r
+               source.link_target(&transformer_);\r
+               CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;            \r
        }\r
-\r
+       \r
        ~implementation()\r
        {\r
-               send(is_running_, false);\r
                semaphore_->release();\r
-               agent::wait(this);\r
        }\r
 \r
-       virtual void run()\r
-       {\r
+       std::shared_ptr<core::audio_buffer> operator()(const safe_ptr<AVPacket>& packet)\r
+       {                       \r
                try\r
                {\r
-                       send(is_running_, true);\r
-                       while(is_running_.value())\r
-                       {                               \r
-                               auto packet = receive(source_);\r
-                       \r
-                               if(packet == loop_packet(index_))\r
-                               {\r
-                                       send(target_, loop_audio());\r
-                                       continue;\r
-                               }\r
+                       auto tok = make_safe<token>(semaphore_);\r
+\r
+                       if(packet == loop_packet(index_))\r
+                       {\r
+                               avcodec_flush_buffers(codec_context_.get());\r
+                               return loop_audio();\r
+                       }\r
 \r
-                               if(packet == eof_packet(index_))\r
-                                       break;\r
+                       if(packet == eof_packet(index_))                        \r
+                               return eof_audio();                     \r
 \r
-                               auto result = std::make_shared<core::audio_buffer>();\r
+                       auto result = safe_ptr<core::audio_buffer>(new core::audio_buffer(), [this, tok](core::audio_buffer* p)\r
+                       {\r
+                               delete p;\r
+                       });\r
 \r
-                               while(packet->size > 0)\r
-                               {\r
-                                       buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
-                                       int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
+                       while(packet->size > 0)\r
+                       {\r
+                               buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
+                               int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
                \r
-                                       int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
+                               int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
 \r
-                                       // There might be several frames in one packet.\r
-                                       packet->size -= ret;\r
-                                       packet->data += ret;\r
+                               // There might be several frames in one packet.\r
+                               packet->size -= ret;\r
+                               packet->data += ret;\r
                        \r
-                                       buffer1_.resize(written_bytes);\r
+                               buffer1_.resize(written_bytes);\r
 \r
-                                       buffer1_ = resampler_.resample(std::move(buffer1_));\r
+                               buffer1_ = resampler_.resample(std::move(buffer1_));\r
                \r
-                                       const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
-                                       const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
-\r
-                                       auto audio_buffer = make_safe<core::audio_buffer>(samples, samples + n_samples);\r
-                                       safe_ptr<core::audio_buffer> audio(audio_buffer.get(), [this, audio_buffer](core::audio_buffer*)\r
-                                       {\r
-                                               semaphore_->release();\r
-                                       });\r
-                                       semaphore_->acquire();\r
-\r
-                                       send(target_, audio);\r
-                                       Concurrency::wait(2);\r
-                               }\r
+                               const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
+                               const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
+\r
+                               result->insert(result->end(), samples, samples + n_samples);\r
                        }\r
+                       return result;\r
                }\r
                catch(...)\r
                {\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
+                       return eof_audio();\r
                }\r
-\r
-               send(is_running_, false);\r
-               send(target_, eof_audio());\r
-\r
-               done();\r
        }\r
 };\r
 \r
-audio_decoder::audio_decoder(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc)\r
+audio_decoder::audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc)\r
        : impl_(new implementation(source, target, context, format_desc))\r
 {\r
 }\r
@@ -162,4 +142,5 @@ int64_t audio_decoder::nb_frames() const
        return 0;\r
 }\r
 \r
+\r
 }}
\ No newline at end of file
index 292b87baf0b5ce8c65e80f133cb7899193a1f56c..b87593a74b934fcd5c7591ba966e1840c1f40341 100644 (file)
@@ -48,7 +48,7 @@ class audio_decoder : boost::noncopyable
 public:\r
 \r
        typedef Concurrency::ISource<safe_ptr<AVPacket>>& source_t;\r
-       typedef Concurrency::ITarget<safe_ptr<core::audio_buffer>>& target_t;\r
+       typedef Concurrency::ITarget<std::shared_ptr<core::audio_buffer>>& target_t;\r
        \r
        explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
        \r
index bef47926e039839ae27e9f876bb2123ffaceeee2..e2ae9bb77e0d90ccd8378b1410f1fa1966b292dd 100644 (file)
@@ -48,10 +48,10 @@ struct audio_resampler::implementation
 \r
                        buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
 \r
-                       CASPAR_LOG(warning) << L"Resampling." <<\r
-                                                                       L" sample_rate:" << input_channels  <<\r
-                                                                       L" audio_channels:" << input_channels  <<\r
-                                                                       L" sample_fmt:" << input_sample_format;\r
+                       CASPAR_LOG(warning) << L"Resampling" <<\r
+                                                                       L" sample_rate: " << input_channels  <<\r
+                                                                       L" audio_channels: " << input_channels  <<\r
+                                                                       L" sample_fmt: " << input_sample_format;\r
 \r
                        CASPAR_VERIFY(resampler, caspar_exception());\r
 \r
index 0331840b275257df958a3ddb23c9e90d6ea04d0d..b42b7077ecd03510f11c752985b1974bc660980b 100644 (file)
@@ -62,17 +62,17 @@ struct ffmpeg_producer : public core::frame_producer
        const size_t                                                                    length_;\r
        \r
        unbounded_buffer<safe_ptr<AVPacket>>                    packets_;\r
-       unbounded_buffer<safe_ptr<AVFrame>>                             video_;\r
-       unbounded_buffer<safe_ptr<core::audio_buffer>>  audio_;\r
+       unbounded_buffer<std::shared_ptr<AVFrame>>                              video_;\r
+       unbounded_buffer<std::shared_ptr<core::audio_buffer>>   audio_;\r
        call<safe_ptr<AVPacket>>                                                throw_away_;\r
        bounded_buffer<safe_ptr<core::basic_frame>>             frames_;\r
                \r
        const safe_ptr<diagnostics::graph>                              graph_;\r
                                        \r
+       input                                                                                   input_; \r
        std::shared_ptr<audio_decoder>                                  audio_decoder_; \r
        std::shared_ptr<video_decoder>                                  video_decoder_;\r
        std::unique_ptr<frame_muxer2>                                   muxer_;\r
-       input                                                                                   input_; \r
 \r
        safe_ptr<core::basic_frame>                                             last_frame_;\r
        \r
index 8fc266a65ee5510b5f7945cda0969f3cd144fa05..b3d95d27312b3aefa015996211921a7f2cb4f4c1 100644 (file)
@@ -6,6 +6,8 @@
 \r
 #include "../../ffmpeg_error.h"\r
 \r
+#include <common/exception/exceptions.h>\r
+\r
 #include <boost/assign.hpp>\r
 \r
 #include <cstdio>\r
@@ -70,63 +72,77 @@ struct filter::implementation
                if(!frame)\r
                        return;\r
 \r
+               if(frame->data[0] == nullptr || frame->width < 1)\r
+                       BOOST_THROW_EXCEPTION(invalid_argument());\r
+\r
                if(filters_.empty())\r
                {\r
                        bypass_.push(frame);\r
                        return;\r
                }\r
-\r
-               if(!graph_)\r
+               \r
+               try\r
                {\r
-                       try\r
+                       if(!graph_)\r
                        {\r
-                               graph_.reset(avfilter_graph_alloc(), [](AVFilterGraph* p){avfilter_graph_free(&p);});\r
+                               try\r
+                               {\r
+                                       graph_.reset(avfilter_graph_alloc(), [](AVFilterGraph* p){avfilter_graph_free(&p);});\r
                                                                \r
-                               // Input\r
-                               std::stringstream args;\r
-                               args << frame->width << ":" << frame->height << ":" << frame->format << ":" << 0 << ":" << 0 << ":" << 0 << ":" << 0; // don't care about pts and aspect_ratio\r
-                               THROW_ON_ERROR2(avfilter_graph_create_filter(&buffersrc_ctx_, avfilter_get_by_name("buffer"), "src", args.str().c_str(), NULL, graph_.get()), "[filter]");\r
-\r
-                               // OPIX_FMT_BGRAutput\r
-                               AVBufferSinkParams *buffersink_params = av_buffersink_params_alloc();\r
-                               buffersink_params->pixel_fmts = pix_fmts_.data();\r
-                               THROW_ON_ERROR2(avfilter_graph_create_filter(&buffersink_ctx_, avfilter_get_by_name("buffersink"), "out", NULL, buffersink_params, graph_.get()), "[filter]");\r
+                                       // Input\r
+                                       std::stringstream args;\r
+                                       args << frame->width << ":" << frame->height << ":" << frame->format << ":" << 0 << ":" << 0 << ":" << 0 << ":" << 0; // don't care about pts and aspect_ratio\r
+                                       THROW_ON_ERROR2(avfilter_graph_create_filter(&buffersrc_ctx_, avfilter_get_by_name("buffer"), "src", args.str().c_str(), NULL, graph_.get()), "[filter]");\r
+\r
+                                       // OPIX_FMT_BGRAutput\r
+                                       AVBufferSinkParams *buffersink_params = av_buffersink_params_alloc();\r
+                                       buffersink_params->pixel_fmts = pix_fmts_.data();\r
+                                       THROW_ON_ERROR2(avfilter_graph_create_filter(&buffersink_ctx_, avfilter_get_by_name("buffersink"), "out", NULL, buffersink_params, graph_.get()), "[filter]");\r
                        \r
-                               AVFilterInOut* outputs = avfilter_inout_alloc();\r
-                               AVFilterInOut* inputs  = avfilter_inout_alloc();\r
+                                       AVFilterInOut* outputs = avfilter_inout_alloc();\r
+                                       AVFilterInOut* inputs  = avfilter_inout_alloc();\r
                        \r
-                               outputs->name                   = av_strdup("in");\r
-                               outputs->filter_ctx             = buffersrc_ctx_;\r
-                               outputs->pad_idx                = 0;\r
-                               outputs->next                   = NULL;\r
-\r
-                               inputs->name                    = av_strdup("out");\r
-                               inputs->filter_ctx              = buffersink_ctx_;\r
-                               inputs->pad_idx                 = 0;\r
-                               inputs->next                    = NULL;\r
+                                       outputs->name                   = av_strdup("in");\r
+                                       outputs->filter_ctx             = buffersrc_ctx_;\r
+                                       outputs->pad_idx                = 0;\r
+                                       outputs->next                   = NULL;\r
+\r
+                                       inputs->name                    = av_strdup("out");\r
+                                       inputs->filter_ctx              = buffersink_ctx_;\r
+                                       inputs->pad_idx                 = 0;\r
+                                       inputs->next                    = NULL;\r
                        \r
-                               THROW_ON_ERROR2(avfilter_graph_parse(graph_.get(), filters_.c_str(), &inputs, &outputs, NULL), "[filter]");\r
+                                       THROW_ON_ERROR2(avfilter_graph_parse(graph_.get(), filters_.c_str(), &inputs, &outputs, NULL), "[filter]");\r
                        \r
-                               avfilter_inout_free(&inputs);\r
-                               avfilter_inout_free(&outputs);\r
+                                       avfilter_inout_free(&inputs);\r
+                                       avfilter_inout_free(&outputs);\r
 \r
-                               THROW_ON_ERROR2(avfilter_graph_config(graph_.get(), NULL), "[filter]");                 \r
+                                       THROW_ON_ERROR2(avfilter_graph_config(graph_.get(), NULL), "[filter]");                 \r
 \r
-                               for(size_t n = 0; n < graph_->filter_count; ++n)\r
+                                       for(size_t n = 0; n < graph_->filter_count; ++n)\r
+                                       {\r
+                                               auto filter_name = graph_->filters[n]->name;\r
+                                               if(strstr(filter_name, "yadif") != 0)\r
+                                                       parallel_yadif_ctx_ = make_parallel_yadif(graph_->filters[n]);\r
+                                       }\r
+                               }\r
+                               catch(...)\r
                                {\r
-                                       auto filter_name = graph_->filters[n]->name;\r
-                                       if(strstr(filter_name, "yadif") != 0)\r
-                                               parallel_yadif_ctx_ = make_parallel_yadif(graph_->filters[n]);\r
+                                       graph_ = nullptr;\r
+                                       throw;\r
                                }\r
                        }\r
-                       catch(...)\r
-                       {\r
-                               graph_ = nullptr;\r
-                               throw;\r
-                       }\r
+               \r
+                       THROW_ON_ERROR2(av_vsrc_buffer_add_frame(buffersrc_ctx_, frame.get(), 0), "[filter]");\r
+               }\r
+               catch(ffmpeg_error&)\r
+               {\r
+                       throw;\r
+               }\r
+               catch(...)\r
+               {\r
+                       BOOST_THROW_EXCEPTION(ffmpeg_error() << boost::errinfo_nested_exception(boost::current_exception()));\r
                }\r
-                       \r
-               THROW_ON_ERROR2(av_vsrc_buffer_add_frame(buffersrc_ctx_, frame.get(), 0), "[filter]");\r
        }\r
 \r
        std::shared_ptr<AVFrame> poll()\r
@@ -143,36 +159,47 @@ struct filter::implementation
                if(!graph_)\r
                        return nullptr;\r
                \r
-               if(avfilter_poll_frame(buffersink_ctx_->inputs[0])) \r
+               try\r
                {\r
-                       AVFilterBufferRef *picref;\r
-                       THROW_ON_ERROR2(av_buffersink_get_buffer_ref(buffersink_ctx_, &picref, 0), "[filter]");\r
-\r
-            if (picref) \r
-                       {               \r
-                               safe_ptr<AVFrame> frame(avcodec_alloc_frame(), [=](AVFrame* p)\r
-                               {\r
-                                       av_free(p);\r
-                                       avfilter_unref_buffer(picref);\r
-                               });\r
-\r
-                               avcodec_get_frame_defaults(frame.get());        \r
-\r
-                               memcpy(frame->data,     picref->data,     sizeof(frame->data));\r
-                               memcpy(frame->linesize, picref->linesize, sizeof(frame->linesize));\r
-                               frame->format                           = picref->format;\r
-                               frame->width                            = picref->video->w;\r
-                               frame->height                           = picref->video->h;\r
-                               frame->pkt_pos                          = picref->pos;\r
-                               frame->interlaced_frame         = picref->video->interlaced;\r
-                               frame->top_field_first          = picref->video->top_field_first;\r
-                               frame->key_frame                        = picref->video->key_frame;\r
-                               frame->pict_type                        = picref->video->pict_type;\r
-                               frame->sample_aspect_ratio      = picref->video->sample_aspect_ratio;\r
-\r
-                               return frame;\r
-            }\r
-        }\r
+                       if(avfilter_poll_frame(buffersink_ctx_->inputs[0])) \r
+                       {\r
+                               AVFilterBufferRef *picref;\r
+                               THROW_ON_ERROR2(av_buffersink_get_buffer_ref(buffersink_ctx_, &picref, 0), "[filter]");\r
+\r
+                               if (picref) \r
+                               {               \r
+                                       safe_ptr<AVFrame> frame(avcodec_alloc_frame(), [=](AVFrame* p)\r
+                                       {\r
+                                               av_free(p);\r
+                                               avfilter_unref_buffer(picref);\r
+                                       });\r
+\r
+                                       avcodec_get_frame_defaults(frame.get());        \r
+\r
+                                       memcpy(frame->data,     picref->data,     sizeof(frame->data));\r
+                                       memcpy(frame->linesize, picref->linesize, sizeof(frame->linesize));\r
+                                       frame->format                           = picref->format;\r
+                                       frame->width                            = picref->video->w;\r
+                                       frame->height                           = picref->video->h;\r
+                                       frame->pkt_pos                          = picref->pos;\r
+                                       frame->interlaced_frame         = picref->video->interlaced;\r
+                                       frame->top_field_first          = picref->video->top_field_first;\r
+                                       frame->key_frame                        = picref->video->key_frame;\r
+                                       frame->pict_type                        = picref->video->pict_type;\r
+                                       frame->sample_aspect_ratio      = picref->video->sample_aspect_ratio;\r
+\r
+                                       return frame;\r
+                               }\r
+                       }\r
+               }\r
+               catch(ffmpeg_error&)\r
+               {\r
+                       throw;\r
+               }\r
+               catch(...)\r
+               {\r
+                       BOOST_THROW_EXCEPTION(ffmpeg_error() << boost::errinfo_nested_exception(boost::current_exception()));\r
+               }\r
 \r
                return nullptr;\r
        }\r
index f7b80e902204cc3182415804b3a33b22f1d2692a..7d6c367757e660b715d8c77025af39d5fb808edf 100644 (file)
@@ -48,8 +48,8 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
        mutable single_assignment<safe_ptr<filter>>             filter_;\r
        const safe_ptr<core::frame_factory>                             frame_factory_;\r
        \r
-       call<safe_ptr<AVFrame>>                                                 push_video_;\r
-       call<safe_ptr<core::audio_buffer>>                              push_audio_;\r
+       call<std::shared_ptr<AVFrame>>                                  push_video_;\r
+       call<std::shared_ptr<core::audio_buffer>>               push_audio_;\r
        \r
        unbounded_buffer<safe_ptr<AVFrame>>                             video_;\r
        unbounded_buffer<safe_ptr<core::audio_buffer>>  audio_;\r
@@ -86,6 +86,7 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
        {\r
                send(is_running_, false);\r
                agent::wait(this);\r
+               CASPAR_LOG(trace) << "[frame_muxer] Stopped.";\r
        }\r
 \r
        std::shared_ptr<core::write_frame> receive_video()\r
@@ -149,12 +150,11 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                        send(is_running_, true);\r
                        while(is_running_.value())\r
                        {\r
-                               auto audio = receive_audio();                                                           \r
-                               auto video = receive_video();\r
-\r
+                               auto audio = receive_audio();   \r
                                if(!audio)\r
                                        break;\r
-\r
+                                                                                       \r
+                               auto video = receive_video();\r
                                if(!video)\r
                                        break;\r
 \r
@@ -221,14 +221,14 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                done();\r
        }\r
                        \r
-       void push_video(const safe_ptr<AVFrame>& video_frame)\r
-       {                               \r
-               if(!is_running_.value())\r
+       void push_video(const std::shared_ptr<AVFrame>& video_frame)\r
+       {               \r
+               if(!video_frame)\r
                        return;\r
 \r
                if(video_frame == eof_video() || video_frame == empty_video())\r
                {\r
-                       send(video_, video_frame);\r
+                       send(video_, make_safe_ptr(video_frame));\r
                        return;\r
                }\r
                                \r
@@ -237,9 +237,14 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                \r
                try\r
                {\r
+                       if(!is_running_.value())\r
+                               return;\r
+\r
                        if(!display_mode_.has_value())\r
                                initialize_display_mode(*video_frame);\r
                                                \r
+                       //send(video_, make_safe_ptr(video_frame));\r
+\r
                        //if(hints & core::frame_producer::ALPHA_HINT)\r
                        //      video_frame->format = make_alpha_format(video_frame->format);\r
                \r
@@ -267,14 +272,14 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                }\r
        }\r
 \r
-       void push_audio(const safe_ptr<core::audio_buffer>& audio_samples)\r
+       void push_audio(const std::shared_ptr<core::audio_buffer>& audio_samples)\r
        {\r
-               if(!is_running_.value())\r
+               if(!audio_samples)\r
                        return;\r
 \r
                if(audio_samples == eof_audio() || audio_samples == empty_audio())\r
                {\r
-                       send(audio_, audio_samples);\r
+                       send(audio_, make_safe_ptr(audio_samples));\r
                        return;\r
                }\r
 \r
@@ -282,7 +287,10 @@ struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopya
                        return;         \r
 \r
                try\r
-               {\r
+               {               \r
+                       if(!is_running_.value())\r
+                               return;\r
+\r
                        audio_data_.insert(audio_data_.end(), audio_samples->begin(), audio_samples->end());\r
                \r
                        while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
index 5a1aa9cba2b20ccb909253ddb6551e61e33d0238..5a00697fc6d12df862dd41e70f605ada536a04e3 100644 (file)
@@ -31,8 +31,8 @@ class frame_muxer2 : boost::noncopyable
 {\r
 public:\r
        \r
-       typedef Concurrency::ISource<safe_ptr<AVFrame>>                         video_source_t;\r
-       typedef Concurrency::ISource<safe_ptr<core::audio_buffer>>      audio_source_t;\r
+       typedef Concurrency::ISource<std::shared_ptr<AVFrame>>                          video_source_t;\r
+       typedef Concurrency::ISource<std::shared_ptr<core::audio_buffer>>       audio_source_t;\r
        typedef Concurrency::ITarget<safe_ptr<core::basic_frame>>       target_t;\r
                                                                 \r
        frame_muxer2(video_source_t* video_source,\r
index 4bf5f00efd6e90c86bc731146443a3937ba61fb2..4fc2cde3a0836af1302e1f33a39a1ffe5f520bc4 100644 (file)
@@ -112,12 +112,22 @@ public:
                                                                \r
                graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
        }\r
+\r
+       ~implementation()\r
+       {\r
+               send(is_running_, false);\r
+               semaphore_->release();\r
+               semaphore_->release();\r
+               agent::wait(this);\r
+               CASPAR_LOG(info) << print() << " Stopped.";\r
+       }\r
        \r
        void stop()\r
        {\r
                send(is_running_, false);\r
                semaphore_->release();\r
-               agent::wait(this);\r
+               semaphore_->release();\r
+               CASPAR_LOG(info) << print() << " Stopping.";\r
        }\r
        \r
        virtual void run()\r
index ee78b3f18abd48e72d0671213e169496acb847af..5a888a17d72d9637b8a84c931c82e626498cf842 100644 (file)
@@ -42,14 +42,14 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <connect.h>\r
+#include <agents.h>\r
 #include <semaphore.h>\r
 \r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
+struct video_decoder::implementation : boost::noncopyable\r
 {      \r
        int                                                                             index_;\r
        safe_ptr<AVCodecContext>                                codec_context_; \r
@@ -59,13 +59,11 @@ struct video_decoder::implementation : public Concurrency::agent, boost::noncopy
        const size_t                                                    height_;\r
 \r
        bool                                                                    is_progressive_;\r
-       \r
-       overwrite_buffer<bool>                                  is_running_;\r
-       unbounded_buffer<safe_ptr<AVPacket>>    source_;\r
-       ITarget<safe_ptr<AVFrame>>&                             target_;\r
-       \r
+               \r
        safe_ptr<semaphore>                                             semaphore_;\r
 \r
+       transformer<safe_ptr<AVPacket>, std::shared_ptr<AVFrame>> transformer_;\r
+\r
 public:\r
        explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
                : codec_context_(open_codec(context, AVMEDIA_TYPE_VIDEO, index_))\r
@@ -74,78 +72,64 @@ public:
                , width_(codec_context_->width)\r
                , height_(codec_context_->height)\r
                , is_progressive_(true)\r
-               , source_([this](const safe_ptr<AVPacket>& packet){return packet->stream_index == index_;})\r
-               , target_(target)\r
-               , semaphore_(make_safe<Concurrency::semaphore>(1)) // IMPORTANT: Must be 1 since avcodec_decode_video2 reuses frame memory.\r
+               , semaphore_(make_safe<semaphore>(1))\r
+               , transformer_([this](const safe_ptr<AVPacket>& packet){return (*this)(packet);}, &target,\r
+                                          [this](const safe_ptr<AVPacket>& packet){return packet->stream_index == index_;})\r
        {               \r
+               source.link_target(&transformer_);\r
                CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
-               \r
-               Concurrency::connect(source, source_);\r
-\r
-               start();\r
        }\r
 \r
        ~implementation()\r
        {\r
-               send(is_running_, false);\r
                semaphore_->release();\r
-               agent::wait(this);\r
        }\r
-\r
-       virtual void run()\r
-       {\r
+               \r
+       std::shared_ptr<AVFrame> operator()(const safe_ptr<AVPacket>& packet)\r
+       {                       \r
                try\r
                {\r
-                       send(is_running_, true);\r
-                       while(is_running_.value())\r
+                       auto tok = make_safe<token>(semaphore_);\r
+\r
+                       if(packet == loop_packet(index_))\r
+                       {\r
+                               avcodec_flush_buffers(codec_context_.get());\r
+                               return loop_video();\r
+                       }\r
+\r
+                       if(packet == eof_packet(index_))                        \r
+                               return eof_video();                     \r
+\r
+                       CASPAR_ASSERT(packet->size > 0);\r
+\r
+                       std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, tok](AVFrame* frame)\r
                        {\r
-                               auto packet = receive(source_);\r
-                       \r
-                               if(packet == loop_packet(index_))\r
-                               {\r
-                                       send(target_, loop_video());\r
-                                       continue;\r
-                               }\r
-\r
-                               if(packet == eof_packet(index_))\r
-                                       break;\r
-\r
-                               std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this](AVFrame* frame)\r
-                               {\r
-                                       av_free(frame);\r
-                                       semaphore_->release();\r
-                               });\r
-                               semaphore_->acquire();\r
-\r
-                               int frame_finished = 0;\r
-                               THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
-\r
-                               // 1 packet <=> 1 frame.\r
-                               // If a decoder consumes less then the whole packet then something is wrong\r
-                               // that might be just harmless padding at the end, or a problem with the\r
-                               // AVParser or demuxer which puted more then one frame in a AVPacket.\r
-\r
-                               if(frame_finished == 0) \r
-                                       continue;\r
-\r
-                               if(decoded_frame->repeat_pict > 0)\r
-                                       CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
+                               av_free(frame);\r
+                       });\r
+\r
+                       int frame_finished = 0;\r
+                       THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
+\r
+                       // 1 packet <=> 1 frame.\r
+                       // If a decoder consumes less then the whole packet then something is wrong\r
+                       // that might be just harmless padding at the end, or a problem with the\r
+                       // AVParser or demuxer which puted more then one frame in a AVPacket.\r
+\r
+                       if(frame_finished == 0) \r
+                               return nullptr;\r
+\r
+                       if(decoded_frame->repeat_pict > 0)\r
+                               CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
                \r
-                               is_progressive_ = decoded_frame->interlaced_frame == 0;\r
+                       is_progressive_ = decoded_frame->interlaced_frame == 0;\r
                                \r
-                               send(target_, make_safe_ptr(decoded_frame));\r
-                               Concurrency::wait(2);\r
-                       }\r
+                       return decoded_frame;\r
                }\r
                catch(...)\r
                {\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
+                       return eof_video();\r
                }\r
-               \r
-               send(is_running_, false),\r
-               send(target_, eof_video());\r
-\r
-               done();\r
        }\r
                \r
        double fps() const\r
@@ -154,7 +138,7 @@ public:
        }\r
 };\r
 \r
-video_decoder::video_decoder(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
+video_decoder::video_decoder(source_t& source, target_t& target, AVFormatContext& context) \r
        : impl_(new implementation(source, target, context))\r
 {\r
 }\r
index a6107c1c61594c6a06fbc543496bc559219781cd..bcd14fc8e7db82a74bf194fb187986ad24fd6901 100644 (file)
@@ -48,7 +48,7 @@ class video_decoder : boost::noncopyable
 public:\r
        \r
        typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
-       typedef Concurrency::ITarget<safe_ptr<AVFrame>>  target_t;\r
+       typedef Concurrency::ITarget<std::shared_ptr<AVFrame>>  target_t;\r
        \r
        explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context);   \r
 \r