]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 29 Oct 2011 11:02:31 +0000 (11:02 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sat, 29 Oct 2011 11:02:31 +0000 (11:02 +0000)
modules/bluefish/consumer/bluefish_consumer.cpp

index 0d4acf54a695dae72914f38708938f1c5b68c736..fc34b2e58774155cf29d1c1164e372ebe6d7b72c 100644 (file)
@@ -26,7 +26,7 @@
 \r
 #include <core/mixer/read_frame.h>\r
 \r
-#include <common/concurrency/executor.h>\r
+#include <common/concurrency/governor.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <common/memory/memclr.h>\r
 #include <common/memory/memcpy.h>\r
@@ -38,6 +38,7 @@
 \r
 #include <tbb/concurrent_queue.h>\r
 \r
+#include <agents.h>\r
 #include <concrt_extras.h>\r
 \r
 #include <boost/timer.hpp>\r
 #include <memory>\r
 #include <array>\r
 \r
+using namespace Concurrency;\r
+\r
 namespace caspar { namespace bluefish { \r
                        \r
-struct bluefish_consumer : boost::noncopyable\r
+struct bluefish_consumer : public Concurrency::agent, boost::noncopyable\r
 {\r
-       safe_ptr<CBlueVelvet4>                          blue_;\r
-       const unsigned int                                      device_index_;\r
-       const core::video_format_desc           format_desc_;\r
-\r
-       const std::wstring                                      model_name_;\r
+       unbounded_buffer<safe_ptr<core::read_frame>> frames_;\r
 \r
-       safe_ptr<diagnostics::graph>            graph_;\r
-       boost::timer                                            frame_timer_;\r
-       boost::timer                                            tick_timer_;\r
-       boost::timer                                            sync_timer_;    \r
-                       \r
-       unsigned int                                            vid_fmt_;\r
+       safe_ptr<CBlueVelvet4>                                          blue_;\r
+       const unsigned int                                                      device_index_;\r
+       const core::video_format_desc                           format_desc_;\r
 \r
-       std::array<blue_dma_buffer_ptr, 4>      reserved_frames_;       \r
-       tbb::concurrent_bounded_queue<std::shared_ptr<core::read_frame>> frame_buffer_;\r
+       const std::wstring                                                      model_name_;\r
 \r
-       int                                                                     preroll_count_;\r
+       safe_ptr<diagnostics::graph>                            graph_;\r
+       boost::timer                                                            frame_timer_;\r
+       boost::timer                                                            tick_timer_;\r
+       boost::timer                                                            sync_timer_;    \r
+                       \r
+       const unsigned int                                                      vid_fmt_;\r
 \r
-       const bool                                                      embedded_audio_;\r
-       const bool                                                      key_only_;\r
+       std::array<blue_dma_buffer_ptr, 4>                      reserved_frames_;       \r
        \r
-       executor                                                        executor_;\r
+       const bool                                                                      embedded_audio_;\r
+       const bool                                                                      key_only_;\r
+       \r
+       governor                                                                        governor_;\r
+       tbb::atomic<bool>                                                       is_running_;\r
 public:\r
        bluefish_consumer(const core::video_format_desc& format_desc, unsigned int device_index, bool embedded_audio, bool key_only) \r
                : blue_(create_blue(device_index))\r
@@ -78,13 +81,10 @@ public:
                , format_desc_(format_desc) \r
                , model_name_(get_card_desc(*blue_))\r
                , vid_fmt_(get_video_mode(*blue_, format_desc))\r
-               , preroll_count_(0)\r
                , embedded_audio_(embedded_audio)\r
                , key_only_(key_only)\r
-               , executor_(print())\r
+               , governor_(1)\r
        {\r
-               executor_.set_capacity(core::consumer_buffer_depth());\r
-\r
                graph_->add_guide("tick-time", 0.5);\r
                graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));   \r
                graph_->add_guide("frame-time", 0.5f);  \r
@@ -159,19 +159,22 @@ public:
                {\r
                        return std::make_shared<blue_dma_buffer>(format_desc_.size, n++);\r
                });\r
+\r
+               is_running_ = true;\r
+               start();\r
                                                                \r
                CASPAR_LOG(info) << print() << L" Successfully Initialized.";\r
        }\r
 \r
        ~bluefish_consumer()\r
        {\r
+               is_running_ = false;\r
+               governor_.cancel();\r
+               agent::wait(this);\r
                try\r
-               {\r
-                       executor_.invoke([&]\r
-                       {\r
-                               disable_video_output();\r
-                               blue_->device_detach();         \r
-                       });\r
+               {                       \r
+                       disable_video_output();\r
+                       blue_->device_detach();         \r
                }\r
                catch(...)\r
                {\r
@@ -200,23 +203,20 @@ public:
        \r
        void send(const safe_ptr<core::read_frame>& frame)\r
        {       \r
-               if(preroll_count_ < executor_.capacity())\r
-               {\r
-                       while(preroll_count_++ < executor_.capacity())\r
-                               schedule_next_video(make_safe<core::read_frame>());\r
-               }\r
-               \r
-               schedule_next_video(frame);                     \r
+               auto ticket = governor_.acquire();\r
+               Concurrency::send(frames_, safe_ptr<core::read_frame>(frame.get(), [frame, ticket](core::read_frame*){}));\r
        }\r
        \r
-       void schedule_next_video(const safe_ptr<core::read_frame>& frame)\r
+       virtual void run()\r
        {\r
                static std::vector<int16_t> silence(MAX_HANC_BUFFER_SIZE, 0);\r
                \r
-               executor_.begin_invoke([=]\r
+               try\r
                {\r
-                       try\r
+                       while(is_running_)\r
                        {\r
+                               auto frame = receive(frames_);\r
+\r
                                const size_t audio_samples       = format_desc_.audio_samples_per_frame;\r
                                const size_t audio_nchannels = format_desc_.audio_channels;\r
 \r
@@ -238,7 +238,10 @@ public:
 \r
                                sync_timer_.restart();\r
                                unsigned long n_field = 0;\r
-                               blue_->wait_output_video_synch(UPD_FMT_FRAME, n_field);\r
+                               {\r
+                                       scoped_oversubcription_token oversubscribe;\r
+                                       blue_->wait_output_video_synch(UPD_FMT_FRAME, n_field);\r
+                               }\r
                                graph_->update_value("sync-time", static_cast<float>(sync_timer_.elapsed()*format_desc_.fps*0.5));\r
 \r
                                // Send and display\r
@@ -279,15 +282,15 @@ public:
                                graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
 \r
                                graph_->update_value("tick-time", static_cast<float>(tick_timer_.elapsed()*format_desc_.fps*0.5));\r
-                               tick_timer_.restart();\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                               tick_timer_.restart();                          \r
                        }\r
-                       graph_->set_value("input-buffer", static_cast<double>(executor_.size())/static_cast<double>(executor_.capacity()));\r
-               });\r
-               graph_->set_value("input-buffer", static_cast<double>(executor_.size())/static_cast<double>(executor_.capacity()));\r
+               }\r
+               catch(...)\r
+               {\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+               }\r
+\r
+               done();\r
        }\r
 \r
        void encode_hanc(BLUE_UINT32* hanc_data, void* audio_data, size_t audio_samples, size_t audio_nchannels)\r
@@ -323,7 +326,6 @@ struct bluefish_consumer_proxy : public core::frame_consumer
        const size_t                                            device_index_;\r
        const bool                                                      embedded_audio_;\r
        const bool                                                      key_only_;\r
-       core::video_format_desc                         format_desc_;\r
 public:\r
 \r
        bluefish_consumer_proxy(size_t device_index, bool embedded_audio, bool key_only)\r
@@ -335,8 +337,10 @@ public:
        \r
        virtual void initialize(const core::video_format_desc& format_desc)\r
        {\r
+               if(consumer_)\r
+                       BOOST_THROW_EXCEPTION(invalid_operation());\r
+\r
                Concurrency::scoped_oversubcription_token oversubscribe;\r
-               format_desc_ = format_desc;\r
                consumer_.reset(new bluefish_consumer(format_desc, device_index_, embedded_audio_, key_only_));\r
        }\r
        \r
@@ -359,6 +363,11 @@ public:
 \r
                return L"bluefish [" + boost::lexical_cast<std::wstring>(device_index_) + L"]";\r
        }\r
+\r
+       virtual size_t buffer_depth() const\r
+       {\r
+               return 1;\r
+       }\r
 };     \r
 \r
 safe_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r