]> git.sesse.net Git - casparcg/commitdiff
2.0.0.2:
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Thu, 9 Dec 2010 21:38:30 +0000 (21:38 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Thu, 9 Dec 2010 21:38:30 +0000 (21:38 +0000)
 - Improved consumer synchronization performance.

git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0.0.2@276 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

core/consumer/bluefish/bluefish_consumer.cpp
core/consumer/bluefish/bluefish_consumer.h
core/consumer/decklink/decklink_consumer.cpp
core/consumer/decklink/decklink_consumer.h
core/consumer/frame_consumer.h
core/consumer/frame_consumer_device.cpp
core/consumer/oal/oal_consumer.cpp
core/consumer/oal/oal_consumer.h
core/consumer/ogl/ogl_consumer.cpp
core/consumer/ogl/ogl_consumer.h

index ae7da5c88135a2695b1764887524f7ece2a6749d..b4d71698e48fce01f926dd2a992d9978445b7204 100644 (file)
@@ -28,9 +28,9 @@
 #include "exception.h"\r
 #include "memory.h"\r
 \r
-#include "../../processor/write_frame.h"\r
+#include "../../../common/concurrency/executor.h"\r
 \r
-#include <boost/thread.hpp>\r
+#include <boost/optional/optional.hpp>\r
 \r
 #include <tbb/concurrent_queue.h>\r
 \r
@@ -39,7 +39,7 @@
 \r
 namespace caspar { namespace core { namespace bluefish {\r
        \r
-struct consumer::implementation\r
+struct consumer::implementation : boost::noncopyable\r
 {\r
        implementation::implementation(const video_format_desc& format_desc, unsigned int device_index, bool embed_audio) \r
                : device_index_(device_index), format_desc_(format_desc), sdk_(BlueVelvetFactory4()), current_id_(0), embed_audio_(embed_audio)\r
@@ -140,18 +140,14 @@ struct consumer::implementation
                page_locked_buffer::reserve_working_size(MAX_HANC_BUFFER_SIZE * hanc_buffers_.size());          \r
                for(size_t n = 0; n < hanc_buffers_.size(); ++n)\r
                        hanc_buffers_[n] = std::make_shared<page_locked_buffer>(MAX_HANC_BUFFER_SIZE);\r
+                               \r
+               executor_.start();\r
 \r
-               frame_buffer_.set_capacity(1);\r
-               thread_ = boost::thread([=]{run();});\r
-               \r
                CASPAR_LOG(info) << TEXT("BLUECARD INFO: Successfully initialized device ") << device_index_;\r
        }\r
 \r
        ~implementation()\r
        {\r
-               frame_buffer_.push(nullptr);\r
-               thread_.join();\r
-\r
                disable_video_output();\r
 \r
                if(sdk_)\r
@@ -172,65 +168,60 @@ struct consumer::implementation
                        CASPAR_LOG(error) << "BLUECARD ERROR: Failed to disable video output. (device " << device_index_ << TEXT(")");          \r
        }\r
 \r
-       void display(const consumer_frame& frame)\r
+       boost::unique_future<void> display(const consumer_frame& frame)\r
        {\r
-               if(exception_ != nullptr)\r
-                       std::rethrow_exception(exception_);\r
-\r
-               frame_buffer_.push(std::make_shared<consumer_frame>(frame));\r
-       }\r
-\r
-       void do_display(consumer_frame_ptr& frame)\r
-       {\r
-               try\r
+               return executor_.begin_invoke([=]\r
                {\r
-                       auto hanc = hanc_buffers_.front();              \r
-                       std::rotate(hanc_buffers_.begin(), hanc_buffers_.begin() + 1, hanc_buffers_.end());\r
+                       try\r
+                       {\r
+                               auto hanc = hanc_buffers_.front();              \r
+                               std::rotate(hanc_buffers_.begin(), hanc_buffers_.begin() + 1, hanc_buffers_.end());\r
                        \r
-                       static size_t audio_samples = 1920;\r
-                       static size_t audio_nchannels = 2;\r
-                       static std::vector<short> silence(audio_samples*audio_nchannels*2, 0);\r
+                               static size_t audio_samples = 1920;\r
+                               static size_t audio_nchannels = 2;\r
+                               static std::vector<short> silence(audio_samples*audio_nchannels*2, 0);\r
 \r
 \r
-                       unsigned long fieldCount = 0;\r
-                       sdk_->wait_output_video_synch(UPD_FMT_FRAME, fieldCount);\r
+                               unsigned long fieldCount = 0;\r
+                               sdk_->wait_output_video_synch(UPD_FMT_FRAME, fieldCount);\r
                                \r
-                       if(embed_audio_)\r
-                       {               \r
-                               auto& frame_audio_data = frame->audio_data().empty() ? silence : frame->audio_data();\r
+                               if(embed_audio_)\r
+                               {               \r
+                                       auto& frame_audio_data = frame.audio_data().empty() ? silence : frame.audio_data();\r
 \r
-                               encode_hanc(reinterpret_cast<BLUE_UINT32*>(hanc->data()), const_cast<short*>(frame_audio_data.data()), audio_samples, audio_nchannels);\r
+                                       encode_hanc(reinterpret_cast<BLUE_UINT32*>(hanc->data()), const_cast<short*>(frame_audio_data.data()), audio_samples, audio_nchannels);\r
                                                                \r
-                               sdk_->system_buffer_write_async(const_cast<unsigned char*>(frame->data().begin()), \r
-                                                                                                frame->data().size(), \r
-                                                                                                nullptr, \r
-                                                                                                BlueImage_HANC_DMABuffer(current_id_, BLUE_DATA_IMAGE));\r
-\r
-                               sdk_->system_buffer_write_async(hanc->data(),\r
-                                                                                                hanc->size(), \r
-                                                                                                nullptr,                 \r
-                                                                                                BlueImage_HANC_DMABuffer(current_id_, BLUE_DATA_HANC));\r
-\r
-                               if(BLUE_FAIL(sdk_->render_buffer_update(BlueBuffer_Image_HANC(current_id_))))\r
-                                       CASPAR_LOG(trace) << TEXT("BLUEFISH: render_buffer_update failed");\r
+                                       sdk_->system_buffer_write_async(const_cast<unsigned char*>(frame.data().begin()), \r
+                                                                                                        frame.data().size(), \r
+                                                                                                        nullptr, \r
+                                                                                                        BlueImage_HANC_DMABuffer(current_id_, BLUE_DATA_IMAGE));\r
+\r
+                                       sdk_->system_buffer_write_async(hanc->data(),\r
+                                                                                                        hanc->size(), \r
+                                                                                                        nullptr,                 \r
+                                                                                                        BlueImage_HANC_DMABuffer(current_id_, BLUE_DATA_HANC));\r
+\r
+                                       if(BLUE_FAIL(sdk_->render_buffer_update(BlueBuffer_Image_HANC(current_id_))))\r
+                                               CASPAR_LOG(trace) << TEXT("BLUEFISH: render_buffer_update failed");\r
+                               }\r
+                               else\r
+                               {\r
+                                       sdk_->system_buffer_write_async(const_cast<unsigned char*>(frame.data().begin()),\r
+                                                                                                        frame.data().size(), \r
+                                                                                                        nullptr,                 \r
+                                                                                                        BlueImage_DMABuffer(current_id_, BLUE_DATA_IMAGE));\r
+                       \r
+                                       if(BLUE_FAIL(sdk_->render_buffer_update(BlueBuffer_Image(current_id_))))\r
+                                               CASPAR_LOG(trace) << TEXT("BLUEFISH: render_buffer_update failed");\r
+                               }\r
+\r
+                               transferring_frame_ = frame;\r
                        }\r
-                       else\r
+                       catch(...)\r
                        {\r
-                               sdk_->system_buffer_write_async(const_cast<unsigned char*>(frame->data().begin()),\r
-                                                                                                frame->data().size(), \r
-                                                                                                nullptr,                 \r
-                                                                                                BlueImage_DMABuffer(current_id_, BLUE_DATA_IMAGE));\r
-                       \r
-                               if(BLUE_FAIL(sdk_->render_buffer_update(BlueBuffer_Image(current_id_))))\r
-                                       CASPAR_LOG(trace) << TEXT("BLUEFISH: render_buffer_update failed");\r
+                               CASPAR_LOG_CURRENT_EXCEPTION();\r
                        }\r
-\r
-                       transferring_frame_ = frame;\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-               }\r
+               });\r
        }\r
 \r
 \r
@@ -260,43 +251,20 @@ struct consumer::implementation
                }                                               \r
        }\r
 \r
-       void run()\r
-       {\r
-               while(true)\r
-               {\r
-                       try\r
-                       {\r
-                               consumer_frame_ptr frame;\r
-                               frame_buffer_.pop(frame);\r
-\r
-                               if(!frame)\r
-                                       return;\r
-\r
-                               do_display(frame);\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               exception_ = std::current_exception();\r
-                       }\r
-               }       \r
-       }\r
-               \r
+       common::executor executor_;\r
+                       \r
        BlueVelvetPtr sdk_;\r
        \r
        unsigned int device_index_;\r
        video_format_desc format_desc_;\r
-       \r
-       std::exception_ptr exception_;\r
-       boost::thread thread_;\r
-       tbb::concurrent_bounded_queue<consumer_frame_ptr> frame_buffer_;\r
-       \r
+               \r
        unsigned long   mem_fmt_;\r
        unsigned long   upd_fmt_;\r
        EVideoMode              vid_fmt_; \r
        unsigned long   res_fmt_; \r
        unsigned long   engine_mode_;\r
 \r
-       consumer_frame_ptr transferring_frame_;\r
+       boost::optional<consumer_frame> transferring_frame_;\r
 \r
        std::array<page_locked_buffer_ptr, 3> hanc_buffers_;\r
        int current_id_;\r
@@ -304,7 +272,7 @@ struct consumer::implementation
 };\r
 \r
 consumer::consumer(const video_format_desc& format_desc, unsigned int device_index, bool embed_audio) : impl_(new implementation(format_desc, device_index, embed_audio)){}    \r
-void consumer::display(const consumer_frame& frame){impl_->display(frame);}\r
+boost::unique_future<void> consumer::display(const consumer_frame& frame){return impl_->display(frame);}\r
 \r
 }}}\r
 \r
index 15f3231f65ee908d42031d7751a31003c960dc75..a89e970e3f38326493d19a2bbcc4dfc51eebd099 100644 (file)
@@ -29,7 +29,7 @@ class consumer : public frame_consumer
 public:\r
        consumer(const video_format_desc& format_desc, unsigned int deviceIndex, bool embed_audio = false);\r
        \r
-       virtual void display(const consumer_frame&);            \r
+       virtual boost::unique_future<void> display(const consumer_frame&);              \r
        virtual bool has_sync_clock() const {return true;}\r
 private:\r
        struct implementation;\r
index 8d3b916a177651f375a47b5c8a344a497aa9851f..b9323f2e3eb01de0f61b71031c8fe7bd1b551f8b 100644 (file)
@@ -26,6 +26,7 @@
 #endif\r
 \r
 #include "decklink_consumer.h"\r
+\r
 #include "util.h"\r
 \r
 #include "DeckLinkAPI_h.h"\r
@@ -33,6 +34,7 @@
 #include "../../format/video_format.h"\r
 #include "../../producer/frame_producer_device.h"\r
 \r
+#include "../../../common/concurrency/executor.h"\r
 #include "../../../common/exception/exceptions.h"\r
 #include "../../../common/utility/scope_exit.h"\r
 \r
 \r
 namespace caspar { namespace core { namespace decklink{\r
 \r
-struct decklink_consumer::Implementation\r
+struct decklink_consumer::Implementation : boost::noncopyable\r
 {\r
        Implementation(const video_format_desc& format_desc, bool internalKey) : format_desc_(format_desc), currentFormat_(video_format::pal), internalKey_(internalKey), current_index_(0)\r
-       {                       \r
-               input_.set_capacity(1),\r
-               thread_ = boost::thread([=]{Run();});\r
+       {       \r
+               executor_.start();\r
+               executor_.invoke([=]\r
+               {\r
+                       if(FAILED(CoInitialize(nullptr))) \r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Initialization of COM failed."));         \r
+\r
+                       CComPtr<IDeckLinkIterator> pDecklinkIterator;\r
+                       if(FAILED(pDecklinkIterator.CoCreateInstance(CLSID_CDeckLinkIterator)))\r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("No Decklink drivers installed."));\r
+\r
+                       while(pDecklinkIterator->Next(&decklink_) == S_OK && !decklink_){}      \r
+\r
+                       if(decklink_ == nullptr)\r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("No Decklink card found"));\r
+\r
+                       output_ = decklink_;\r
+                       keyer_ = decklink_;\r
+\r
+                       BSTR pModelName;\r
+                       decklink_->GetModelName(&pModelName);\r
+                       if(pModelName != nullptr)\r
+                               CASPAR_LOG(info) << "DECKLINK: Modelname: " << pModelName;\r
+               \r
+                       unsigned long decklinkVideoFormat = GetDecklinkVideoFormat(format_desc_.format);\r
+                       if(decklinkVideoFormat == ULONG_MAX) \r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Card does not support requested videoformat."));\r
+               \r
+                       currentFormat_ = format_desc_.format;\r
+\r
+                       BMDDisplayModeSupport displayModeSupport;\r
+                       if(FAILED(output_->DoesSupportVideoMode((BMDDisplayMode)decklinkVideoFormat, bmdFormat8BitBGRA, &displayModeSupport)))\r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Card does not support requested videoformat"));\r
+               \r
+                       output_->DisableAudioOutput();\r
+                       if(FAILED(output_->EnableVideoOutput((BMDDisplayMode)decklinkVideoFormat, bmdVideoOutputFlagDefault))) \r
+                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Could not enable video output"));\r
+\r
+                       if(internalKey_) \r
+                       {\r
+                               if(FAILED(keyer_->Enable(FALSE)))                       \r
+                                       CASPAR_LOG(error) << "DECKLINK: Failed to enable internal keyer";                       \r
+                               else if(FAILED(keyer_->SetLevel(255)))                  \r
+                                       CASPAR_LOG(error) << "DECKLINK: Keyer - Failed to set blend-level to max";\r
+                               else\r
+                                       CASPAR_LOG(info) << "DECKLINK: Successfully configured internal keyer";         \r
+                       }\r
+                       else\r
+                       {\r
+                               if(FAILED(keyer_->Enable(TRUE)))                        \r
+                                       CASPAR_LOG(error) << "DECKLINK: Failed to enable external keyer";       \r
+                               else\r
+                                       CASPAR_LOG(info) << "DECKLINK: Successfully configured external keyer";                 \r
+                       }\r
+               \r
+                       reserved_frames_.resize(3);\r
+                       for(int n = 0; n < reserved_frames_.size(); ++n)\r
+                       {\r
+                               if(FAILED(output_->CreateVideoFrame(format_desc_.width, format_desc_.height, format_desc_.size/format_desc_.height, bmdFormat8BitBGRA, bmdFrameFlagDefault, &reserved_frames_[n].second)))\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Failed to create frame."));\r
+\r
+                               if(FAILED(reserved_frames_[n].second->GetBytes(&reserved_frames_[n].first)))\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Failed to get frame bytes."));\r
+                       }\r
+\r
+                       CASPAR_LOG(info) << "DECKLINK: Successfully initialized decklink for " << format_desc_.name;\r
+               });\r
        }\r
 \r
        ~Implementation()\r
-       {\r
-               input_.push(nullptr);\r
-               thread_.join();\r
-\r
+       {               \r
                if(output_) \r
                {\r
                        BOOL bIsRunning = FALSE;\r
@@ -73,124 +136,32 @@ struct decklink_consumer::Implementation
 \r
                        output_->DisableVideoOutput();\r
                }\r
+               CoUninitialize();\r
        }\r
        \r
-       void display(const consumer_frame& frame)\r
+       boost::unique_future<void> display(const consumer_frame& input_frame)\r
        {\r
-               if(exception_ != nullptr)\r
-                       std::rethrow_exception(exception_);\r
-\r
-               input_.push(std::make_shared<consumer_frame>(frame));\r
-       }\r
-               \r
-       void do_display(const consumer_frame& input_frame)\r
-       {\r
-               try\r
-               {\r
-                       auto& output_frame = reserved_frames_[current_index_];\r
-                       current_index_ = (++current_index_) % reserved_frames_.size();\r
-               \r
-                       std::copy(input_frame.data().begin(), input_frame.data().end(), static_cast<char*>(output_frame.first));\r
-                               \r
-                       if(FAILED(output_->DisplayVideoFrameSync(output_frame.second)))\r
-                               CASPAR_LOG(error) << L"DECKLINK: Failed to display frame.";\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-               }\r
-       }\r
-\r
-       void Run()\r
-       {               \r
-               if(FAILED(CoInitialize(nullptr)))\r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Initialization of COM failed."));         \r
-               CASPAR_SCOPE_EXIT(CoUninitialize);\r
-               \r
-               init();\r
-               \r
-               while(true)\r
+               return executor_.begin_invoke([=]\r
                {\r
                        try\r
-                       {       \r
-                               consumer_frame_ptr frame;\r
-                               input_.pop(frame);\r
+                       {\r
+                               auto& output_frame = reserved_frames_[current_index_];\r
+                               current_index_ = (++current_index_) % reserved_frames_.size();\r
+               \r
+                               std::copy(input_frame.data().begin(), input_frame.data().end(), static_cast<char*>(output_frame.first));\r
                                \r
-                               if(!frame)\r
-                                       return;\r
-\r
-                               do_display(*frame);\r
+                               if(FAILED(output_->DisplayVideoFrameSync(output_frame.second)))\r
+                                       CASPAR_LOG(error) << L"DECKLINK: Failed to display frame.";\r
                        }\r
                        catch(...)\r
                        {\r
-                               exception_ = std::current_exception();\r
+                               CASPAR_LOG_CURRENT_EXCEPTION();\r
                        }\r
-               }\r
-       }\r
-\r
-       void init()\r
-       {               \r
-               CComPtr<IDeckLinkIterator> pDecklinkIterator;\r
-               if(FAILED(pDecklinkIterator.CoCreateInstance(CLSID_CDeckLinkIterator)))\r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("No Decklink drivers installed."));\r
-\r
-               while(pDecklinkIterator->Next(&decklink_) == S_OK && !decklink_){}      \r
-\r
-               if(decklink_ == nullptr)\r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("No Decklink card found"));\r
-\r
-               output_ = decklink_;\r
-               keyer_ = decklink_;\r
-\r
-               BSTR pModelName;\r
-               decklink_->GetModelName(&pModelName);\r
-               if(pModelName != nullptr)\r
-                       CASPAR_LOG(info) << "DECKLINK: Modelname: " << pModelName;\r
-               \r
-               unsigned long decklinkVideoFormat = GetDecklinkVideoFormat(format_desc_.format);\r
-               if(decklinkVideoFormat == ULONG_MAX) \r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Card does not support requested videoformat."));\r
-               \r
-               currentFormat_ = format_desc_.format;\r
-\r
-               BMDDisplayModeSupport displayModeSupport;\r
-               if(FAILED(output_->DoesSupportVideoMode((BMDDisplayMode)decklinkVideoFormat, bmdFormat8BitBGRA, &displayModeSupport)))\r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Card does not support requested videoformat"));\r
-               \r
-               output_->DisableAudioOutput();\r
-               if(FAILED(output_->EnableVideoOutput((BMDDisplayMode)decklinkVideoFormat, bmdVideoOutputFlagDefault))) \r
-                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Could not enable video output"));\r
-\r
-               if(internalKey_) \r
-               {\r
-                       if(FAILED(keyer_->Enable(FALSE)))                       \r
-                               CASPAR_LOG(error) << "DECKLINK: Failed to enable internal keyer";                       \r
-                       else if(FAILED(keyer_->SetLevel(255)))                  \r
-                               CASPAR_LOG(error) << "DECKLINK: Keyer - Failed to set blend-level to max";\r
-                       else\r
-                               CASPAR_LOG(info) << "DECKLINK: Successfully configured internal keyer";         \r
-               }\r
-               else\r
-               {\r
-                       if(FAILED(keyer_->Enable(TRUE)))                        \r
-                               CASPAR_LOG(error) << "DECKLINK: Failed to enable external keyer";       \r
-                       else\r
-                               CASPAR_LOG(info) << "DECKLINK: Successfully configured external keyer";                 \r
-               }\r
-               \r
-               reserved_frames_.resize(3);\r
-               for(int n = 0; n < reserved_frames_.size(); ++n)\r
-               {\r
-                       if(FAILED(output_->CreateVideoFrame(format_desc_.width, format_desc_.height, format_desc_.size/format_desc_.height, bmdFormat8BitBGRA, bmdFrameFlagDefault, &reserved_frames_[n].second)))\r
-                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Failed to create frame."));\r
-\r
-                       if(FAILED(reserved_frames_[n].second->GetBytes(&reserved_frames_[n].first)))\r
-                               BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("DECKLINK: Failed to get frame bytes."));\r
-               }\r
-\r
-               CASPAR_LOG(info) << "DECKLINK: Successfully initialized decklink for " << format_desc_.name;\r
+               });\r
        }\r
                        \r
+       common::executor executor_;\r
+\r
        std::vector<std::pair<void*, CComPtr<IDeckLinkMutableVideoFrame>>> reserved_frames_;\r
        size_t  current_index_;\r
 \r
@@ -201,18 +172,14 @@ struct decklink_consumer::Implementation
        \r
        video_format::type currentFormat_;\r
        video_format_desc format_desc_;\r
-\r
-       std::exception_ptr      exception_;\r
-       boost::thread           thread_;\r
-       tbb::concurrent_bounded_queue<consumer_frame_ptr> input_;\r
 };\r
 \r
 decklink_consumer::decklink_consumer(const video_format_desc& format_desc, bool internalKey) : pImpl_(new Implementation(format_desc, internalKey))\r
 {}\r
 \r
-void decklink_consumer::display(const consumer_frame& frame)\r
+boost::unique_future<void> decklink_consumer::display(const consumer_frame& frame)\r
 {\r
-       pImpl_->display(frame);\r
+       return pImpl_->display(frame);\r
 }\r
        \r
 }}}    
\ No newline at end of file
index 16b8249d0fc6b3f8e49863f2e5b50b82b85e0022..eef3ef800a490ce77ecfc7505f8b6c16c4307b5c 100644 (file)
@@ -30,7 +30,7 @@ class decklink_consumer : public frame_consumer
 public:\r
        explicit decklink_consumer(const video_format_desc& format_desc, bool internalKey = false);\r
        \r
-       virtual void display(const consumer_frame&);\r
+       virtual boost::unique_future<void> display(const consumer_frame&);\r
        virtual bool has_sync_clock() const {return false;}\r
 private:\r
        struct Implementation;\r
index 786ff684b8b6912f4f1d2bd3666412cbad304bff..e5d5d2a753e41ef4f08bcadad3570c5fb22ecc33 100644 (file)
@@ -24,6 +24,8 @@
 \r
 #include <boost/noncopyable.hpp>\r
 \r
+#include <boost/thread/future.hpp>\r
+\r
 #include <memory>\r
 \r
 namespace caspar { namespace core {\r
@@ -32,8 +34,19 @@ struct frame_consumer : boost::noncopyable
 {\r
        virtual ~frame_consumer() {}\r
 \r
-       virtual void prepare(const consumer_frame&){}\r
-       virtual void display(const consumer_frame&){}\r
+       virtual boost::unique_future<void> prepare(const consumer_frame&)\r
+       {\r
+               boost::promise<void> promise;\r
+               promise.set_value();\r
+               return promise.get_future();\r
+       }\r
+\r
+       virtual boost::unique_future<void> display(const consumer_frame&)\r
+       {\r
+               boost::promise<void> promise;\r
+               promise.set_value();\r
+               return promise.get_future();\r
+       }\r
        virtual bool has_sync_clock() const {return false;}\r
 };\r
 typedef std::shared_ptr<frame_consumer> frame_consumer_ptr;\r
index f588d83175d62c0a221c2389d8f70cd8b987b599..4f9c3feaeb3b07be8f052d41164b1ffbab543df8 100644 (file)
@@ -52,15 +52,7 @@ public:
                if(consumers.empty())\r
                        BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
                                << msg_info("frame_consumer_device requires atleast one consumer."));\r
-\r
-               //if(std::any_of(consumers.begin(), consumers.end(), \r
-               //      [&](const frame_consumer_ptr& pConsumer)\r
-               //      { return pConsumer->get_video_format_desc() != format_desc;}))\r
-               //{\r
-               //      BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("consumer") \r
-               //              << msg_info("All consumers must have same frameformat as frame_consumer_device."));\r
-               //}\r
-\r
+               \r
                needs_clock_ = !std::any_of(consumers.begin(), consumers.end(), std::mem_fn(&frame_consumer::has_sync_clock));\r
                is_running_ = true;\r
                display_thread_ = boost::thread([=]{run();});\r
@@ -88,36 +80,43 @@ public:
        }\r
 \r
        void display_frame(const consumer_frame& frame)\r
-       {\r
-               BOOST_FOREACH(const frame_consumer_ptr& consumer, consumers_)\r
+       {               \r
+               typedef std::vector<std::pair<boost::unique_future<void>, frame_consumer_ptr>> sync_container;\r
+               sync_container sync;\r
+               boost::range::transform(consumers_, std::back_inserter(sync), [=](const frame_consumer_ptr& consumer)\r
                {\r
-                       try\r
+                       return std::make_pair(consumer->prepare(frame), consumer);\r
+               });\r
+               \r
+               prepared_frames_.push_back(frame);\r
+                               \r
+               if(prepared_frames_.size() > 2)\r
+               {       \r
+                       boost::range::transform(consumers_, std::back_inserter(sync), [=](const frame_consumer_ptr& consumer)\r
                        {\r
-                               consumer->prepare(frame);\r
-                               prepared_frames_.push_back(frame);\r
+                               return std::make_pair(consumer->display(prepared_frames_.front()), consumer);\r
+                       });\r
+                       prepared_frames_.pop_front();\r
+               }\r
 \r
-                               if(prepared_frames_.size() > 2)\r
-                               {\r
-                                       consumer->display(prepared_frames_.front());\r
-                                       prepared_frames_.pop_front();\r
-                               }\r
+               boost::range::for_each(sync, [=](sync_container::value_type& sync)\r
+               {\r
+                       try\r
+                       {\r
+                               sync.first.get();\r
                        }\r
                        catch(...)\r
                        {\r
-                               try\r
+                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                               boost::range::remove_erase(consumers_, sync.second);\r
+                               CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
+                               if(consumers_.empty())\r
                                {\r
-                                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                                       boost::range::remove_erase(consumers_, consumer);\r
-                                       CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
-                                       if(consumers_.empty())\r
-                                       {\r
-                                               CASPAR_LOG(warning) << "No consumers available. Shutting down frame_consumer_device.";\r
-                                               is_running_ = false;\r
-                                       }\r
+                                       CASPAR_LOG(warning) << "No consumers available. Shutting down frame_consumer_device.";\r
+                                       is_running_ = false;\r
                                }\r
-                               catch(...){}\r
                        }\r
-               }\r
+               });\r
        }\r
 \r
        std::deque<consumer_frame> prepared_frames_;\r
index 938edaf7ffb14bc1ee2824c19923c862f49db158..fe0e167a767c29b319b02f27014f6f67a50a939a 100644 (file)
@@ -35,24 +35,29 @@ struct consumer::implementation : public sf::SoundStream, boost::noncopyable
 {\r
        implementation() : container_(5), underrun_count_(0)\r
        {\r
-               input_.set_capacity(3);\r
                sf::SoundStream::Initialize(2, 48000);\r
        }\r
 \r
        ~implementation()\r
        {\r
-               input_.clear();\r
                Stop();\r
        }\r
        \r
-       void push(const consumer_frame& frame)\r
+       boost::unique_future<void> push(const consumer_frame& frame)\r
        {\r
-               // NOTE: tbb::concurrent_queue does not have rvalue support. \r
-               // Use shared_ptr to emulate move semantics\r
                input_.push(frame.audio_data()); \r
+\r
+               auto promise = std::make_shared<boost::promise<void>>();\r
                \r
                if(GetStatus() != Playing && input_.size() > 2)\r
                        Play();\r
+               \r
+               if(GetStatus() == Playing)\r
+                       promises_.push(promise);\r
+               else\r
+                       promise->set_value();\r
+\r
+               return promise->get_future();\r
        }\r
        \r
        bool OnStart() \r
@@ -65,8 +70,11 @@ struct consumer::implementation : public sf::SoundStream, boost::noncopyable
        {\r
                static std::vector<short> silence(1920*2, 0);\r
                \r
-               std::vector<short> audio_data;\r
-               \r
+               std::shared_ptr<boost::promise<void>> promise;\r
+               promises_.pop(promise);\r
+               promise->set_value();\r
+\r
+               std::vector<short> audio_data;          \r
                if(!input_.try_pop(audio_data))\r
                {\r
                        if(underrun_count_ == 0)\r
@@ -91,14 +99,17 @@ struct consumer::implementation : public sf::SoundStream, boost::noncopyable
                        data.Samples = container_.back().data();\r
                        data.NbSamples = container_.back().size();\r
                }\r
+\r
                return true;\r
        }\r
 \r
+       tbb::concurrent_bounded_queue<std::shared_ptr<boost::promise<void>>> promises_;\r
+       tbb::concurrent_bounded_queue<std::vector<short>> input_;\r
+\r
        long underrun_count_;\r
        boost::circular_buffer<std::vector<short>> container_;\r
-       tbb::concurrent_bounded_queue<std::vector<short>> input_;\r
 };\r
 \r
 consumer::consumer(const video_format_desc&) : impl_(new implementation()){}\r
-void consumer::prepare(const consumer_frame& frame){impl_->push(frame);}\r
+boost::unique_future<void> consumer::prepare(const consumer_frame& frame){return impl_->push(frame);}\r
 }}}\r
index 6bfd844860741bb52c5d274fa8b83d6b046e82c3..33e2cb03e0f25e5130843540c94a267cd1ade993 100644 (file)
@@ -28,7 +28,7 @@ class consumer : public frame_consumer
 public:        \r
        explicit consumer(const video_format_desc& format_desc);\r
        \r
-       virtual void prepare(const consumer_frame& frame);\r
+       virtual boost::unique_future<void> prepare(const consumer_frame& frame);\r
        virtual bool has_sync_clock() const {return true;}\r
 private:\r
        struct implementation;\r
index c8b090e479590c90968e3d9f7bf467e0461df3e1..8a8c0b77cc3da730d36702c1bc19970267d9dca1 100644 (file)
@@ -26,6 +26,7 @@
 #include "../../processor/write_frame.h"\r
 #include "../../../common/gl/utility.h"\r
 #include "../../../common/gl/pixel_buffer_object.h"\r
+#include "../../../common/concurrency/executor.h"\r
 \r
 #include <boost/thread.hpp>\r
 \r
@@ -81,47 +82,40 @@ struct consumer::implementation : boost::noncopyable
                screenX_ = 0;\r
                screenY_ = 0;\r
 #endif\r
-               frame_buffer_.set_capacity(1);\r
-               thread_ = boost::thread([=]{run();});\r
-       }\r
-       \r
-       ~implementation()\r
-       {\r
-               frame_buffer_.push(nullptr);\r
-               thread_.join();\r
-       }\r
 \r
-       void init()     \r
-       {\r
-               window_.Create(sf::VideoMode(format_desc_.width, format_desc_.height, 32), "CasparCG", windowed_ ? sf::Style::Titlebar : sf::Style::Fullscreen);\r
-               window_.ShowMouseCursor(false);\r
-               window_.SetPosition(screenX_, screenY_);\r
-               window_.SetSize(screen_width_, screen_height_);\r
-               window_.SetActive();\r
-               GL(glEnable(GL_TEXTURE_2D));\r
-               GL(glDisable(GL_DEPTH_TEST));           \r
-               GL(glClearColor(0.0, 0.0, 0.0, 0.0));\r
-               GL(glViewport(0, 0, format_desc_.width, format_desc_.height));\r
-               glLoadIdentity();\r
+               executor_.start();\r
+               executor_.invoke([=]\r
+               {\r
+                       window_.Create(sf::VideoMode(format_desc_.width, format_desc_.height, 32), "CasparCG", windowed_ ? sf::Style::Titlebar : sf::Style::Fullscreen);\r
+                       window_.ShowMouseCursor(false);\r
+                       window_.SetPosition(screenX_, screenY_);\r
+                       window_.SetSize(screen_width_, screen_height_);\r
+                       window_.SetActive();\r
+                       GL(glEnable(GL_TEXTURE_2D));\r
+                       GL(glDisable(GL_DEPTH_TEST));           \r
+                       GL(glClearColor(0.0, 0.0, 0.0, 0.0));\r
+                       GL(glViewport(0, 0, format_desc_.width, format_desc_.height));\r
+                       glLoadIdentity();\r
                                \r
-               wratio_ = static_cast<float>(format_desc_.width)/static_cast<float>(format_desc_.width);\r
-               hratio_ = static_cast<float>(format_desc_.height)/static_cast<float>(format_desc_.height);\r
-\r
-               std::pair<float, float> target_ratio = None();\r
-               if(stretch_ == ogl::fill)\r
-                       target_ratio = Fill();\r
-               else if(stretch_ == ogl::uniform)\r
-                       target_ratio = Uniform();\r
-               else if(stretch_ == ogl::uniform_to_fill)\r
-                       target_ratio = UniformToFill();\r
-\r
-               wSize_ = target_ratio.first;\r
-               hSize_ = target_ratio.second;\r
+                       wratio_ = static_cast<float>(format_desc_.width)/static_cast<float>(format_desc_.width);\r
+                       hratio_ = static_cast<float>(format_desc_.height)/static_cast<float>(format_desc_.height);\r
+\r
+                       std::pair<float, float> target_ratio = None();\r
+                       if(stretch_ == ogl::fill)\r
+                               target_ratio = Fill();\r
+                       else if(stretch_ == ogl::uniform)\r
+                               target_ratio = Uniform();\r
+                       else if(stretch_ == ogl::uniform_to_fill)\r
+                               target_ratio = UniformToFill();\r
+\r
+                       wSize_ = target_ratio.first;\r
+                       hSize_ = target_ratio.second;\r
                                        \r
-               pbos_[0].create(format_desc_.width, format_desc_.height);\r
-               pbos_[1].create(format_desc_.width, format_desc_.height);\r
+                       pbos_[0].create(format_desc_.width, format_desc_.height);\r
+                       pbos_[1].create(format_desc_.width, format_desc_.height);\r
+               });\r
        }\r
-\r
+       \r
        std::pair<float, float> None()\r
        {\r
                float width = static_cast<float>(format_desc_.width)/static_cast<float>(screen_width_);\r
@@ -176,42 +170,20 @@ struct consumer::implementation : boost::noncopyable
                pbos_[next_index].begin_write();\r
        }\r
                        \r
-       void display(const consumer_frame& frame)\r
+       boost::unique_future<void> display(const consumer_frame& frame)\r
        {\r
-               if(exception_ != nullptr)\r
-                       std::rethrow_exception(exception_);\r
-\r
-               frame_buffer_.push(std::make_shared<consumer_frame>(frame));\r
-       }\r
-\r
-       void run()\r
-       {                       \r
-               init();\r
-                               \r
-               while(true)\r
+               return executor_.begin_invoke([=]\r
                {\r
-                       try\r
-                       {               \r
-                               consumer_frame_ptr frame;\r
-                               frame_buffer_.pop(frame);\r
-\r
-                               if(!frame)\r
-                                       return;\r
-\r
-                               sf::Event e;\r
-                               while(window_.GetEvent(e)){}\r
-                               window_.SetActive();\r
-                               render(*frame);\r
-                               window_.Display();\r
-                               \r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               exception_ = std::current_exception();\r
-                       }\r
-               }               \r
-       }               \r
+                       sf::Event e;\r
+                       while(window_.GetEvent(e)){}\r
+                       window_.SetActive();\r
+                       render(frame);\r
+                       window_.Display();\r
+               });\r
+       }\r
 \r
+       common::executor executor_;\r
+       \r
        float wratio_;\r
        float hratio_;\r
        \r
@@ -229,15 +201,11 @@ struct consumer::implementation : boost::noncopyable
                                \r
        stretch stretch_;\r
        video_format_desc format_desc_;\r
-\r
-       std::exception_ptr exception_;\r
-       boost::thread thread_;\r
-       tbb::concurrent_bounded_queue<consumer_frame_ptr> frame_buffer_;\r
-\r
+       \r
        sf::Window window_;\r
 };\r
 \r
 consumer::consumer(const video_format_desc& format_desc, unsigned int screen_index, stretch stretch, bool windowed)\r
 : impl_(new implementation(format_desc, screen_index, stretch, windowed)){}\r
-void consumer::display(const consumer_frame& frame){impl_->display(frame);}\r
+boost::unique_future<void> consumer::display(const consumer_frame& frame){return impl_->display(frame);}\r
 }}}\r
index 6fbe02c80bd4d042c1bdaf555a01052d7d817ff3..d948845de4b0d8a5c96aa68f5845015c66cd10ad 100644 (file)
@@ -38,7 +38,7 @@ class consumer : public frame_consumer
 public:        \r
        explicit consumer(const video_format_desc& format_desc, unsigned int screen_index = 0, stretch stretch = stretch::fill, bool windowed = false);\r
        \r
-       virtual void display(const consumer_frame& frame);\r
+       virtual boost::unique_future<void> display(const consumer_frame& frame);\r
        virtual bool has_sync_clock() const {return false;}\r
 private:\r
        struct implementation;\r