]> git.sesse.net Git - casparcg/blobdiff - modules/decklink/producer/decklink_producer.cpp
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / modules / decklink / producer / decklink_producer.cpp
index de51381ca2cd2fe940e0d4086154c8b8842c8a56..34d1bfcf31f60f5a0c46f93bcab33ba45f7750ac 100644 (file)
 \r
 #include "../../ffmpeg/producer/filter/filter.h"\r
 #include "../../ffmpeg/producer/util.h"\r
+#include "../../ffmpeg/producer/frame_muxer.h"\r
 \r
+#include <common/log/log.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <common/concurrency/com_context.h>\r
 #include <common/exception/exceptions.h>\r
 #include <common/memory/memclr.h>\r
 \r
 #include <core/mixer/write_frame.h>\r
-#include <core/producer/frame/audio_transform.h>\r
+#include <core/producer/frame/frame_transform.h>\r
 #include <core/producer/frame/frame_factory.h>\r
-#include <core/producer/frame_muxer.h>\r
 \r
-#include <tbb/concurrent_queue.h>\r
-#include <tbb/atomic.h>\r
+#include <agents.h>\r
+#include <agents_extras.h>\r
+#include <ppl.h>\r
 \r
 #include <boost/algorithm/string.hpp>\r
+#include <boost/foreach.hpp>\r
 #include <boost/timer.hpp>\r
 \r
 #if defined(_MSC_VER)\r
@@ -70,55 +73,43 @@ extern "C"
 \r
 #include <functional>\r
 \r
-namespace caspar { \r
+namespace caspar { namespace decklink {\r
                \r
-class decklink_producer : public IDeckLinkInputCallback\r
-{      \r
-       CComPtr<IDeckLink>                                                                                      decklink_;\r
-       CComQIPtr<IDeckLinkInput>                                                                       input_;\r
-       \r
-       const std::wstring                                                                                      model_name_;\r
-       const core::video_format_desc                                                           format_desc_;\r
-       const size_t                                                                                            device_index_;\r
+typedef std::pair<CComPtr<IDeckLinkVideoInputFrame>, CComPtr<IDeckLinkAudioInputPacket>> frame_packet;\r
 \r
-       std::shared_ptr<diagnostics::graph>                                                     graph_;\r
-       boost::timer                                                                                            tick_timer_;\r
-       boost::timer                                                                                            frame_timer_;\r
+class decklink_producer : boost::noncopyable, public IDeckLinkInputCallback\r
+{      \r
+       Concurrency::ITarget<frame_packet>&     target_;\r
 \r
-       std::vector<int16_t>                                                                            audio_samples_;\r
+       CComPtr<IDeckLink>                                      decklink_;\r
+       CComQIPtr<IDeckLinkInput>                       input_;\r
        \r
-       safe_ptr<core::frame_factory>                                                           frame_factory_;\r
+       const std::wstring                                      model_name_;\r
+       const core::video_format_desc           format_desc_;\r
+       const size_t                                            device_index_;\r
 \r
-       tbb::concurrent_bounded_queue<safe_ptr<core::basic_frame>>      frame_buffer_;\r
-       safe_ptr<core::basic_frame>                                                                     tail_;\r
-\r
-       std::exception_ptr                                                                                      exception_;\r
-       filter                                                                                                          filter_;\r
-               \r
-       core::frame_muxer                                                                                       muxer_;\r
+       safe_ptr<diagnostics::graph>            graph_;\r
+       boost::timer                                            tick_timer_;\r
+       boost::timer                                            frame_timer_;\r
 \r
 public:\r
-       decklink_producer(const core::video_format_desc& format_desc, size_t device_index, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter)\r
-               : decklink_(get_device(device_index))\r
+       decklink_producer(Concurrency::ITarget<frame_packet>& target, const core::video_format_desc& format_desc, size_t device_index)\r
+               : target_(target)\r
+               , decklink_(get_device(device_index))\r
                , input_(decklink_)\r
                , model_name_(get_model_name(decklink_))\r
                , format_desc_(format_desc)\r
                , device_index_(device_index)\r
-               , frame_factory_(frame_factory)\r
-               , tail_(core::basic_frame::empty())\r
-               , filter_(filter)\r
-               , muxer_(double_rate(filter) ? format_desc.fps * 2.0 : format_desc.fps, frame_factory->get_video_format_desc().mode, frame_factory->get_video_format_desc().fps)\r
-       {\r
-               frame_buffer_.set_capacity(2);\r
-               \r
-               graph_ = diagnostics::create_graph(boost::bind(&decklink_producer::print, this));\r
+               , graph_ (diagnostics::create_graph("", false))\r
+       {               \r
                graph_->add_guide("tick-time", 0.5);\r
-               graph_->set_color("tick-time", diagnostics::color(0.1f, 0.7f, 0.8f));\r
+               graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));   \r
                graph_->set_color("late-frame", diagnostics::color(0.6f, 0.3f, 0.3f));\r
                graph_->set_color("frame-time", diagnostics::color(1.0f, 0.0f, 0.0f));\r
                graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));\r
                graph_->set_color("output-buffer", diagnostics::color(0.0f, 1.0f, 0.0f));\r
-               \r
+               graph_->update_text(narrow(print()));\r
+\r
                auto display_mode = get_display_mode(input_, format_desc_.format, bmdFormat8BitYUV, bmdVideoInputFlagDefault);\r
                \r
                // NOTE: bmdFormat8BitARGB is currently not supported by any decklink card. (2011-05-08)\r
@@ -127,7 +118,7 @@ public:
                                                                        << msg_info(narrow(print()) + " Could not enable video input.")\r
                                                                        << boost::errinfo_api_function("EnableVideoInput"));\r
 \r
-               if(FAILED(input_->EnableAudioInput(bmdAudioSampleRate48kHz, bmdAudioSampleType16bitInteger, 2))) \r
+               if(FAILED(input_->EnableAudioInput(bmdAudioSampleRate48kHz, bmdAudioSampleType32bitInteger, format_desc_.audio_channels))) \r
                        BOOST_THROW_EXCEPTION(caspar_exception() \r
                                                                        << msg_info(narrow(print()) + " Could not enable audio input.")\r
                                                                        << boost::errinfo_api_function("EnableAudioInput"));\r
@@ -143,10 +134,13 @@ public:
                                                                        << boost::errinfo_api_function("StartStreams"));\r
 \r
                CASPAR_LOG(info) << print() << L" Successfully Initialized.";\r
+\r
+               graph_->start();\r
        }\r
 \r
        ~decklink_producer()\r
        {\r
+               Concurrency::scoped_oversubcription_token oversubscribe;\r
                if(input_ != nullptr) \r
                {\r
                        input_->StopStreams();\r
@@ -165,138 +159,191 @@ public:
 \r
        virtual HRESULT STDMETHODCALLTYPE VideoInputFrameArrived(IDeckLinkVideoInputFrame* video, IDeckLinkAudioInputPacket* audio)\r
        {       \r
-               if(!video)\r
-                       return S_OK;\r
+               if(!Concurrency::asend(target_, frame_packet(CComPtr<IDeckLinkVideoInputFrame>(video), CComPtr<IDeckLinkAudioInputPacket>(audio))))\r
+                       graph_->add_tag("dropped-frame");\r
+               return S_OK;\r
+       }\r
+               \r
+       std::wstring print() const\r
+       {\r
+               return model_name_ + L" [" + boost::lexical_cast<std::wstring>(device_index_) + L"]";\r
+       }\r
+};\r
+       \r
+class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
+{              \r
+       Concurrency::bounded_buffer<ffmpeg::video_message_t>    video_frames_;\r
+       Concurrency::bounded_buffer<ffmpeg::audio_message_t>    audio_buffers_;\r
+       Concurrency::bounded_buffer<ffmpeg::frame_message_t>    muxed_frames_;\r
 \r
-               try\r
-               {\r
-                       graph_->update_value("tick-time", tick_timer_.elapsed()*format_desc_.fps*0.5);\r
-                       tick_timer_.restart();\r
+       const core::video_format_desc           format_desc_;\r
+       const size_t                                            device_index_;\r
 \r
-                       frame_timer_.restart();\r
+       safe_ptr<core::basic_frame>                     last_frame_;\r
+       const int64_t                                           length_;\r
 \r
-                       void* bytes = nullptr;\r
-                       if(FAILED(video->GetBytes(&bytes)) || !bytes)\r
-                               return S_OK;\r
-                       \r
-                       safe_ptr<AVFrame> av_frame(avcodec_alloc_frame(), av_free);     \r
-                       avcodec_get_frame_defaults(av_frame.get());\r
-                                               \r
-                       av_frame->data[0]                       = reinterpret_cast<uint8_t*>(bytes);\r
-                       av_frame->linesize[0]           = video->GetRowBytes();                 \r
-                       av_frame->format                        = PIX_FMT_UYVY422;\r
-                       av_frame->width                         = video->GetWidth();\r
-                       av_frame->height                        = video->GetHeight();\r
-                       av_frame->interlaced_frame      = format_desc_.mode != core::video_mode::progressive;\r
-                       av_frame->top_field_first       = format_desc_.mode == core::video_mode::upper ? 1 : 0;\r
-                                       \r
-                       filter_.push(av_frame);\r
-                       BOOST_FOREACH(auto& av_frame2, filter_.poll())\r
-                               muxer_.push(make_write_frame(this, av_frame2, frame_factory_));         \r
-                                                                       \r
-                       // It is assumed that audio is always equal or ahead of video.\r
-                       if(audio && SUCCEEDED(audio->GetBytes(&bytes)))\r
-                       {\r
-                               auto sample_frame_count = audio->GetSampleFrameCount();\r
-                               auto audio_data = reinterpret_cast<short*>(bytes);\r
-                               audio_samples_.insert(audio_samples_.end(), audio_data, audio_data + sample_frame_count*2);\r
+       ffmpeg::filter                                          filter_;\r
+               \r
+       ffmpeg::frame_muxer2                            muxer_;\r
 \r
-                               if(audio_samples_.size() > frame_factory_->get_video_format_desc().audio_samples_per_frame)\r
-                               {\r
-                                       const auto begin = audio_samples_.begin();\r
-                                       const auto end   = begin +  frame_factory_->get_video_format_desc().audio_samples_per_frame;\r
-                                       muxer_.push(std::vector<int16_t>(begin, end));\r
-                                       audio_samples_.erase(begin, end);\r
-                               }\r
-                       }\r
-                       else\r
-                               muxer_.push(std::vector<int16_t>(frame_factory_->get_video_format_desc().audio_samples_per_frame, 0));\r
-                                       \r
-                       while(!muxer_.empty())\r
-                       {\r
-                               if(!frame_buffer_.try_push(muxer_.pop()))\r
-                                       graph_->add_tag("dropped-frame");\r
-                       }\r
+       mutable Concurrency::single_assignment<std::wstring> print_;\r
 \r
-                       graph_->update_value("frame-time", frame_timer_.elapsed()*format_desc_.fps*0.5);\r
+       safe_ptr<Concurrency::semaphore> semaphore_;\r
 \r
-                       graph_->set_value("output-buffer", static_cast<float>(frame_buffer_.size())/static_cast<float>(frame_buffer_.capacity()));      \r
-               }\r
-               catch(...)\r
-               {\r
-                       exception_ = std::current_exception();\r
-                       return E_FAIL;\r
-               }\r
+       volatile bool is_running_;\r
+public:\r
 \r
-               return S_OK;\r
-       }\r
-       \r
-       safe_ptr<core::basic_frame> get_frame()\r
+       explicit decklink_producer_proxy(const safe_ptr<core::frame_factory>& frame_factory, const core::video_format_desc& format_desc, size_t device_index, const std::wstring& filter_str, int64_t length)\r
+               : video_frames_(1)\r
+               , audio_buffers_(1)\r
+               , muxed_frames_(1)\r
+               , format_desc_(format_desc)\r
+               , device_index_(device_index)\r
+               , last_frame_(core::basic_frame::empty())\r
+               , length_(length)\r
+               , filter_(filter_str)\r
+               , muxer_(&video_frames_, &audio_buffers_, muxed_frames_, ffmpeg::double_rate(filter_str) ? format_desc.fps * 2.0 : format_desc.fps, frame_factory)\r
+               , is_running_(true)\r
+               , semaphore_(make_safe<Concurrency::semaphore>(3))\r
        {\r
-               if(exception_ != nullptr)\r
-                       std::rethrow_exception(exception_);\r
+               agent::start();\r
+       }\r
 \r
-               if(!frame_buffer_.try_pop(tail_))\r
-                       graph_->add_tag("late-frame");\r
-               graph_->set_value("output-buffer", static_cast<float>(frame_buffer_.size())/static_cast<float>(frame_buffer_.capacity()));      \r
-               return tail_;\r
+       ~decklink_producer_proxy()\r
+       {\r
+               is_running_ = false;\r
+               agent::wait(this);\r
        }\r
-       \r
-       std::wstring print() const\r
+                               \r
+       virtual safe_ptr<core::basic_frame> receive(int)\r
        {\r
-               return model_name_ + L" [" + boost::lexical_cast<std::wstring>(device_index_) + L"]";\r
+               auto frame = core::basic_frame::late();\r
+\r
+               try\r
+               {\r
+                       last_frame_ = frame = Concurrency::receive(muxed_frames_)->payload;\r
+               }\r
+               catch(Concurrency::operation_timed_out&)\r
+               {               \r
+                       //graph_->add_tag("underflow"); \r
+               }\r
+\r
+               return frame;\r
        }\r
-};\r
-       \r
-class decklink_producer_proxy : public core::frame_producer\r
-{              \r
-       com_context<decklink_producer> context_;\r
-public:\r
 \r
-       explicit decklink_producer_proxy(const safe_ptr<core::frame_factory>& frame_factory, const core::video_format_desc& format_desc, size_t device_index, const std::wstring& filter_str = L"")\r
-               : context_(L"decklink_producer[" + boost::lexical_cast<std::wstring>(device_index) + L"]")\r
+       virtual safe_ptr<core::basic_frame> last_frame() const\r
        {\r
-               context_.reset([&]{return new decklink_producer(format_desc, device_index, frame_factory, filter_str);}); \r
+               return disable_audio(last_frame_);\r
        }\r
-                               \r
-       virtual safe_ptr<core::basic_frame> receive()\r
+       \r
+       virtual int64_t nb_frames() const \r
        {\r
-               return context_->get_frame();\r
+               return length_;\r
        }\r
        \r
        std::wstring print() const\r
        {\r
-               return context_->print();\r
+               return print_.value();\r
+       }\r
+\r
+       virtual void run()\r
+       {\r
+               try\r
+               {\r
+                       struct co_init\r
+                       {\r
+                               co_init()  {CoInitialize(NULL);}\r
+                               ~co_init() {CoUninitialize();}\r
+                       } init;\r
+                       \r
+                       Concurrency::bounded_buffer<frame_packet> input_buffer(2);\r
+\r
+                       std::unique_ptr<decklink_producer> producer;\r
+                       {                               \r
+                               Concurrency::scoped_oversubcription_token oversubscribe;\r
+                               producer.reset(new decklink_producer(input_buffer, format_desc_, device_index_));\r
+                       }\r
+\r
+                       Concurrency::send(print_, producer->print());\r
+\r
+                       while(is_running_)\r
+                       {\r
+                               auto packet = Concurrency::receive(input_buffer);\r
+                               auto video  = packet.first;\r
+                               auto audio  = packet.second;\r
+                               \r
+                               void* bytes = nullptr;\r
+                               if(FAILED(video->GetBytes(&bytes)) || !bytes)\r
+                                       continue;\r
+                       \r
+                               safe_ptr<AVFrame> av_frame(avcodec_alloc_frame(), av_free);     \r
+                               avcodec_get_frame_defaults(av_frame.get());\r
+                                               \r
+                               av_frame->data[0]                       = reinterpret_cast<uint8_t*>(bytes);\r
+                               av_frame->linesize[0]           = video->GetRowBytes();                 \r
+                               av_frame->format                        = PIX_FMT_UYVY422;\r
+                               av_frame->width                         = video->GetWidth();\r
+                               av_frame->height                        = video->GetHeight();\r
+                               av_frame->interlaced_frame      = format_desc_.field_mode != core::field_mode::progressive;\r
+                               av_frame->top_field_first       = format_desc_.field_mode == core::field_mode::upper ? 1 : 0;\r
+                                       \r
+                               filter_.push(av_frame);\r
+\r
+                               Concurrency::parallel_invoke(\r
+                               [&]\r
+                               {\r
+                                       while(true)\r
+                                       {\r
+                                               auto frame = filter_.poll();\r
+                                               if(!frame)\r
+                                                       break;\r
+                                               Concurrency::send(video_frames_, ffmpeg::make_message(frame, std::make_shared<ffmpeg::token>(semaphore_)));\r
+                                       }\r
+                               },\r
+                               [&]\r
+                               {                                                                                                       \r
+                                       // It is assumed that audio is always equal or ahead of video.\r
+                                       if(audio && SUCCEEDED(audio->GetBytes(&bytes)))\r
+                                       {\r
+                                               auto sample_frame_count = audio->GetSampleFrameCount();\r
+                                               auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
+                                               Concurrency::send(audio_buffers_, ffmpeg::make_message(std::make_shared<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), std::make_shared<ffmpeg::token>(semaphore_)));\r
+                                       }\r
+                                       else\r
+                                               Concurrency::send(audio_buffers_, ffmpeg::make_message(ffmpeg::empty_audio(), std::make_shared<ffmpeg::token>(semaphore_)));    \r
+                               });\r
+                       }\r
+\r
+               }\r
+               catch(...)\r
+               {\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+               }\r
+               \r
+               CASPAR_LOG(info) << print() << L" Successfully Uninitialized."; \r
+\r
+               done();\r
        }\r
 };\r
 \r
-safe_ptr<core::frame_producer> create_decklink_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::vector<std::wstring>& params)\r
+safe_ptr<core::frame_producer> create_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::vector<std::wstring>& params)\r
 {\r
        if(params.empty() || !boost::iequals(params[0], "decklink"))\r
                return core::frame_producer::empty();\r
 \r
-       size_t device_index = 1;\r
-       if(params.size() > 1)\r
-               device_index = lexical_cast_or_default(params[1], 1);\r
-\r
-       core::video_format_desc format_desc = core::video_format_desc::get(L"PAL");\r
-       if(params.size() > 2)\r
-       {\r
-               auto desc = core::video_format_desc::get(params[2]);\r
-               if(desc.format != core::video_format::invalid)\r
-                       format_desc = desc;\r
-       }\r
+       auto device_index       = core::get_param(L"DEVICE", params, 1);\r
+       auto filter_str         = core::get_param<std::wstring>(L"FILTER", params, L"");        \r
+       auto length                     = core::get_param(L"LENGTH", params, std::numeric_limits<int64_t>::max());      \r
        \r
-       std::wstring filter_str = L"";\r
+       boost::replace_all(filter_str, L"DEINTERLACE", L"YADIF=0:-1");\r
+       boost::replace_all(filter_str, L"DEINTERLACE_BOB", L"YADIF=1:-1");\r
 \r
-       auto filter_it = std::find(params.begin(), params.end(), L"FILTER");\r
-       if(filter_it != params.end())\r
-       {\r
-               if(++filter_it != params.end())\r
-                       filter_str = *filter_it;\r
-       }\r
+       auto format_desc        = core::video_format_desc::get(core::get_param<std::wstring>(L"FORMAT", params, L"INVALID"));\r
 \r
-       return make_safe<decklink_producer_proxy>(frame_factory, format_desc, device_index, filter_str);\r
+       if(format_desc.format == core::video_format::invalid)\r
+               format_desc = frame_factory->get_video_format_desc();\r
+                       \r
+       return make_safe<decklink_producer_proxy>(frame_factory, format_desc, device_index, filter_str, length);\r
 }\r
 \r
-}
\ No newline at end of file
+}}
\ No newline at end of file