]> git.sesse.net Git - casparcg/commitdiff
2.0.0.2: Refactored executor. Optimized frame allocations.
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Wed, 25 May 2011 22:47:16 +0000 (22:47 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Wed, 25 May 2011 22:47:16 +0000 (22:47 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0.0.2@811 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

19 files changed:
common/concurrency/com_context.h
common/concurrency/executor.h
common/diagnostics/graph.cpp
common/gl/gl_check.cpp
core/consumer/frame_consumer_device.cpp
core/mixer/frame_mixer_device.cpp
core/mixer/gpu/ogl_device.cpp
core/mixer/gpu/ogl_device.h
core/mixer/image/image_mixer.cpp
core/producer/destroy_producer_proxy.cpp
core/producer/frame_producer_device.cpp
modules/bluefish/consumer/bluefish_consumer.cpp
modules/ffmpeg/consumer/ffmpeg_consumer.cpp
modules/ffmpeg/producer/input.cpp
modules/ogl/consumer/ogl_consumer.cpp
modules/silverlight/producer/silverlight_producer.cpp
protocol/cii/CIIProtocolStrategy.cpp
shell/caspar.config
shell/main.cpp

index c0638339ec2c9779813205a14bb8ae74bdfd790f..dfd66ddc8a29593ed9d83240c5edfd8c060fcf5d 100644 (file)
@@ -15,8 +15,7 @@ class com_context : public executor
 {\r
        std::unique_ptr<T> instance_;\r
 public:\r
-       com_context(const std::wstring& name)\r
-               : executor(name, true)\r
+       com_context(const std::wstring& name) : executor(name)\r
        {\r
                executor::begin_invoke([]\r
                {\r
@@ -26,7 +25,6 @@ public:
 \r
        ~com_context()\r
        {\r
-               executor::clear();\r
                executor::invoke([&]\r
                {\r
                        instance_.reset(nullptr);\r
index 32ac8e76fc2dbfed16fe60f9a56c0280dc27e7af..a2427d3ceb3a951d25b0d3c99fdd70378678a421 100644 (file)
@@ -61,76 +61,68 @@ inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
 \r
 }\r
 \r
+enum priority\r
+{\r
+       high_priority,\r
+       normal_priority,\r
+       priority_count\r
+};\r
+\r
 class executor : boost::noncopyable\r
 {\r
        const std::string name_;\r
        boost::thread thread_;\r
        tbb::atomic<bool> is_running_;\r
-       tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
+       \r
+       typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;\r
+       function_queue execution_queue_[priority_count];\r
+       \r
 public:\r
                \r
-       explicit executor(const std::wstring& name, bool auto_start = false) : name_(narrow(name)) // noexcept\r
+       explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept\r
        {\r
-               is_running_ = false;\r
-               if(auto_start)\r
-                       start();\r
+               thread_ = boost::thread([this]{run();});\r
+               is_running_ = true;\r
        }\r
        \r
        virtual ~executor() // noexcept\r
        {\r
                stop();\r
-               clear();\r
+               \r
+               std::function<void()> func;\r
+               while(execution_queue_[normal_priority].try_pop(func)){} // Wake all waiting push threads.\r
+\r
                if(boost::this_thread::get_id() != thread_.get_id())\r
                        thread_.join();\r
        }\r
 \r
        void set_capacity(size_t capacity) // noexcept\r
        {\r
-               execution_queue_.set_capacity(capacity);\r
+               execution_queue_[normal_priority].set_capacity(capacity);\r
        }\r
-\r
-       void start() // noexcept\r
-       {\r
-               if(is_running_.fetch_and_store(true))\r
-                       return;\r
-               clear();\r
-               thread_ = boost::thread([this]{run();});\r
-       }\r
-                       \r
+                               \r
        void stop() // noexcept\r
        {\r
                is_running_ = false;    \r
-               execution_queue_.try_push([]{});\r
+               execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.\r
        }\r
 \r
-       void wait()\r
+       void wait() // noexcept\r
        {\r
                invoke([]{});\r
        }\r
-       \r
-       void clear() // noexcept\r
-       {\r
-               std::function<void()> func;\r
-               auto size = execution_queue_.size();\r
-               for(int n = 0; n < size; ++n)\r
-               {\r
-                       try\r
-                       {\r
-                               if(!execution_queue_.try_pop(func))\r
-                                       return;\r
-                       }\r
-                       catch(boost::broken_promise&){}\r
-               }\r
-       }\r
-                       \r
+                               \r
        template<typename Func>\r
-       auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
+       auto begin_invoke(Func&& func, priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
        {       \r
                typedef boost::packaged_task<decltype(func())> task_type;\r
                                \r
                auto task = task_type(std::forward<Func>(func));\r
                auto future = task.get_future();\r
                \r
+               if(!is_running_)\r
+                       return std::move(future);       \r
+\r
                task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
                {\r
                        try\r
@@ -150,40 +142,57 @@ public:
                        mutable task_type task;\r
                } task_adaptor(std::move(task));\r
 \r
-               execution_queue_.push([=]\r
+               execution_queue_[priority].push([=]\r
                {\r
                        try{task_adaptor();}\r
                        catch(boost::task_already_started&){}\r
                        catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
                });\r
+\r
+               if(priority != normal_priority)\r
+                       execution_queue_[normal_priority].push(nullptr);\r
                                        \r
                return std::move(future);               \r
        }\r
        \r
        template<typename Func>\r
-       auto invoke(Func&& func) -> decltype(func())\r
+       auto invoke(Func&& func, priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
        {\r
-               if(!is_running_)\r
-                       start();\r
-\r
                if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
                        return func();\r
                \r
-               return begin_invoke(std::forward<Func>(func)).get();\r
+               return begin_invoke(std::forward<Func>(func), prioriy).get();\r
+       }\r
+\r
+       void yield() // noexcept\r
+       {\r
+               if(boost::this_thread::get_id() != thread_.get_id())  // Only yield when calling from execution thread.\r
+                       return;\r
+\r
+               std::function<void()> func;\r
+               while(execution_queue_[high_priority].try_pop(func))\r
+               {\r
+                       if(func)\r
+                               func();\r
+               }       \r
        }\r
        \r
-       tbb::concurrent_bounded_queue<std::function<void()>>::size_type capacity() const { return execution_queue_.capacity();  }\r
-       tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
-       bool empty() const              { return execution_queue_.empty();      }\r
-       bool is_running() const { return is_running_;                           }       \r
+       function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity();  }\r
+       function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size();  }\r
+       bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty();     }\r
+       bool is_running() const /*noexcept*/ { return is_running_; }    \r
                \r
 private:\r
        \r
        void execute() // noexcept\r
        {\r
                std::function<void()> func;\r
-               execution_queue_.pop(func);     \r
-               func();\r
+               execution_queue_[normal_priority].pop(func);    \r
+\r
+               yield();\r
+\r
+               if(func)\r
+                       func();\r
        }\r
 \r
        void run() // noexcept\r
@@ -192,6 +201,7 @@ private:
                detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
                while(is_running_)\r
                        execute();\r
+               is_running_ = false;\r
        }       \r
 };\r
 \r
index a1f96350b76d1fe13a166c99eedd2ee4214cfc8d..73fab59156e0c95909469780eb034fdfa4f1555c 100644 (file)
@@ -75,7 +75,6 @@ public:
 private:\r
        context() : executor_(L"diagnostics")\r
        {\r
-               executor_.start();\r
                executor_.begin_invoke([this]\r
                {                       \r
                        SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);\r
index 512e5aefd5a03854d6f3cf741343f071cbd39e84..3d694ff06c30413b8db4b9e0b77f1191751db3be 100644 (file)
@@ -101,11 +101,14 @@ void SMFL_GLCheckError(const std::string& expr, const std::string& File, unsigne
                }\r
 \r
                // Log the error\r
-               CASPAR_LOG(error) << "An internal OpenGL call failed in "\r
+               std::stringstream str;\r
+               str << "An internal OpenGL call failed in "\r
                                  << File.substr(File.find_last_of("\\/") + 1).c_str() << " (" << Line << ") : "\r
                                  << Error.c_str() << ", " << Desc.c_str()\r
                                  << ", " << expr.c_str()\r
                                  << std::endl;\r
+               BOOST_THROW_EXCEPTION(caspar_exception() <<\r
+                       msg_info(str.str()));\r
        }\r
 }\r
 \r
index b32b3a771e2f6e6ec2ff82e2aea4935c773e343f..2832c514acea85a60758dd2cf19585a49e7b287f 100644 (file)
@@ -60,7 +60,7 @@ public:
        implementation( const video_format_desc& format_desc) \r
                : format_desc_(format_desc)\r
                , diag_(diagnostics::create_graph(std::string("frame_consumer_device")))\r
-               , executor_(L"frame_consumer_device", true)\r
+               , executor_(L"frame_consumer_device")\r
        {               \r
                diag_->set_color("input-buffer", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
                diag_->add_guide("frame-time", 0.5f);   \r
index 38e1e73704e5ad19fe0d430c5dd91dd12f5a24d4..663a9a1e8e76efee8408fee23c134d85ad2d168a 100644 (file)
@@ -111,7 +111,7 @@ public:
                : format_desc_(format_desc)\r
                , diag_(diagnostics::create_graph(narrow(print())))\r
                , image_mixer_(format_desc)\r
-               , executor_(L"frame_mixer_device", true)\r
+               , executor_(L"frame_mixer_device")\r
        {\r
                diag_->add_guide("frame-time", 0.5f);   \r
                diag_->set_color("frame-time", diagnostics::color(1.0f, 0.0f, 0.0f));\r
index 4a411a6097fc2bd1c4ae954e38c2826492eceafe..5f9c9b952bb62b2b34ec5a13a82ad4386d9a7f2d 100644 (file)
@@ -28,7 +28,7 @@
 \r
 namespace caspar { namespace core {\r
 \r
-ogl_device::ogl_device() : executor_(L"ogl_device", true)\r
+ogl_device::ogl_device() : executor_(L"ogl_device")\r
 {\r
        invoke([=]\r
        {\r
@@ -67,12 +67,16 @@ safe_ptr<device_buffer> ogl_device::do_create_device_buffer(size_t width, size_t
                executor_.invoke([&]\r
                {\r
                        buffer = std::make_shared<device_buffer>(width, height, stride);\r
-               });     \r
+\r
+                       if(glGetError() != GL_NO_ERROR)\r
+                               BOOST_THROW_EXCEPTION(std::bad_alloc());\r
+\r
+               }, high_priority);      \r
                executor_.begin_invoke([=]\r
                {\r
                        auto buffer = std::make_shared<device_buffer>(width, height, stride);\r
                        pool->try_push(buffer);\r
-               });     \r
+               }, high_priority);      \r
        }\r
                        \r
        return safe_ptr<device_buffer>(buffer.get(), [=](device_buffer*){pool->push(buffer);});\r
@@ -93,7 +97,11 @@ safe_ptr<host_buffer> ogl_device::do_create_host_buffer(size_t size, host_buffer
                                buffer->map();\r
                        else\r
                                buffer->unmap();\r
-               });     \r
+\r
+                       if(glGetError() != GL_NO_ERROR)\r
+                               BOOST_THROW_EXCEPTION(std::bad_alloc());\r
+\r
+               }, high_priority);      \r
                executor_.begin_invoke([=]\r
                {\r
                        auto buffer = std::make_shared<host_buffer>(size, usage);\r
@@ -102,7 +110,7 @@ safe_ptr<host_buffer> ogl_device::do_create_host_buffer(size_t size, host_buffer
                        else\r
                                buffer->unmap();\r
                        pool->try_push(buffer);\r
-               });     \r
+               }, high_priority);      \r
        }\r
        \r
        return safe_ptr<host_buffer>(buffer.get(), [=](host_buffer*)\r
@@ -113,8 +121,10 @@ safe_ptr<host_buffer> ogl_device::do_create_host_buffer(size_t size, host_buffer
                                buffer->map();\r
                        else\r
                                buffer->unmap();\r
-                       pool->push(buffer);\r
-               });\r
+                       \r
+                       if(glGetError() == GL_NO_ERROR)\r
+                               pool->push(buffer);\r
+               }, high_priority);\r
        });\r
 }\r
 \r
index 8c37415048a7e6e4c66cb8095a490d07649f91a9..7812165539c103a068df84754ed8f5454376eadb 100644 (file)
@@ -94,6 +94,11 @@ public:
                return get_instance().do_create_host_buffer(size, usage);\r
        }\r
 \r
+       static void yield()\r
+       {\r
+               get_instance().executor_.yield();\r
+       }\r
+\r
        static std::wstring get_version();\r
 };\r
 \r
index 5cfe5bc10b2a0bb7d1b2c23c7ae5d4e1bd82e158..319add062e715383f46b83e21152e236876418f6 100644 (file)
@@ -153,8 +153,11 @@ public:
                                draw_buffer_->attach(); \r
 \r
                                BOOST_FOREACH(auto item, layer)                 \r
+                               {       \r
+                                       ogl_device::yield(); // Allow quick buffer allocation to execute.\r
                                        draw(item);     \r
-                                                               \r
+                               }\r
+\r
                                layer_key_ = local_key_; // If there was only key in last layer then use it as key for the entire next layer.\r
                                local_key_ = false;\r
 \r
index ff936a5de4d8b97be4ff40a5fb67b69d91e3d2ac..69a9a140f8cd4edc65aa55e14b9b3916b3854707 100644 (file)
@@ -30,7 +30,7 @@ struct destroyer
 {\r
        executor executor_;\r
 \r
-       destroyer() : executor_(L"destroyer", true){}\r
+       destroyer() : executor_(L"destroyer"){}\r
        \r
        void destroy(safe_ptr<frame_producer>&& producer)\r
        {\r
index f856a26c34c94e39c66b1bed5a799dff1ad3cdd8..9fae118f282e8e23a6b39a8d8aa5ec6db35eb62c 100644 (file)
@@ -65,7 +65,7 @@ public:
        implementation(const video_format_desc& format_desc)  \r
                : format_desc_(format_desc)\r
                , diag_(diagnostics::create_graph(std::string("frame_producer_device")))\r
-               , executor_(L"frame_producer_device", true)\r
+               , executor_(L"frame_producer_device")\r
        {\r
                diag_->add_guide("frame-time", 0.5f);   \r
                diag_->set_color("frame-time", diagnostics::color(1.0f, 0.0f, 0.0f));\r
index d51b6b9a6568ddb0a1692445486a149963129770..d6c45db904ed648c856cfa3da1452219347ec11c 100644 (file)
@@ -120,7 +120,7 @@ public:
                , engine_mode_(VIDEO_ENGINE_FRAMESTORE)         \r
                , vid_fmt_(VID_FMT_INVALID) \r
                , embedded_audio_(embedded_audio)\r
-               , executor_(print(), true)\r
+               , executor_(print())\r
        {\r
                if(!BlueVelvetFactory4 || (embedded_audio_ && (!encode_hanc_frame || !encode_hanc_frame)))\r
                        BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Bluefish drivers not found."));\r
@@ -234,7 +234,6 @@ public:
 \r
        ~bluefish_consumer()\r
        {\r
-               executor_.clear();\r
                executor_.invoke([&]\r
                {\r
                        disable_video_output();\r
index 510b318fd40fa5f79fbb6f7941ceaa822220909a..fa3e12997b46fcdc58c56b92418cd335249c0efa 100644 (file)
@@ -86,7 +86,7 @@ public:
                , video_outbuf_(1920*1080*4)\r
                , audio_outbuf_(48000)\r
                , format_desc_(format_desc)\r
-               , executor_(L"ffmpeg_consumer", true)\r
+               , executor_(L"ffmpeg_consumer")\r
        {\r
                active_ = executor_.begin_invoke([]{});\r
 \r
index d0f3983b42b73980e9fc1a01fd7d92c51ba7fd60..e70785b13694a9c8df4d78e2270798027a5e95d7 100644 (file)
@@ -199,7 +199,6 @@ public:
                for(size_t n = 0; n < 16; ++n) // Read some packets for pre-rolling.\r
                        read_next_packet();\r
                                                        \r
-               executor_.start();\r
                executor_.begin_invoke([this]{read_file();});\r
                CASPAR_LOG(info) << print() << " Started.";\r
        }\r
index 6def1adcf7d96a9c6f374a7a90207b1055e7c7c2..7c3bb3288b53757ff6e118607006eefdf0a455a3 100644 (file)
@@ -144,7 +144,6 @@ public:
                if(screen_index != 0)\r
                        CASPAR_LOG(warning) << print() << " only supports screen_index=0 for non-Win32";\r
 #endif         \r
-               executor_.start();\r
                executor_.invoke([=]\r
                {\r
                        window_.Create(sf::VideoMode(screen_width_, screen_height_, 32), narrow(print()), windowed_ ? sf::Style::Resize : sf::Style::Fullscreen);\r
index b3c5bd50755c4455267ff7a6ff8064d45419b37a..183ace3f3bbdef231d001c84795f4e95bc6fced5 100644 (file)
@@ -142,7 +142,6 @@ public:
 \r
        silverlight_producer(const safe_ptr<core::frame_factory>& frame_factory) : executor_(L"silverlight")\r
        {\r
-               executor_.start();\r
                executor_.invoke([=]\r
                {\r
                        renderer_.reset(new silverlight_renderer(frame_factory));\r
index d37a457f1ce83abc0fbe707e40e4000a8a5a90e4..53b41ae0ea8796c5f9e006e8ccc3a9220b68f11f 100644 (file)
@@ -43,7 +43,6 @@ const TCHAR CIIProtocolStrategy::TokenDelimiter = TEXT('\\');
 \r
 CIIProtocolStrategy::CIIProtocolStrategy(const std::vector<safe_ptr<core::channel>>& channels) : pChannel_(channels.at(0)), executor_(L"CIIProtocolStrategy")\r
 {\r
-       executor_.start();\r
 }\r
 \r
 void CIIProtocolStrategy::Parse(const TCHAR* pData, int charCount, IO::ClientInfoPtr pClientInfo) \r
index 23355fcc53f88ed3b70cccc52981ce1f75428e12..586eee9e395ad86221358ebc996eaa78aba9bd8a 100644 (file)
           <external-key>true</external-key>     <!-- [true/false]-->\r
           <key-only>false</key-only>            <!-- [true/false] - Copies key into fill channels. -->\r
         </decklink>\r
-        <!--<ogl>        \r
-          <device>1</device>\r
-          <stretch>uniform</stretch>\r
-          <windowed>true</windowed>\r
-          <output>key_only</output>\r
-        </ogl>-->\r
         <!--<audio>\r
         </audio>-->\r
         <!--<bluefish>\r
         </bluefish>-->\r
       </consumers>\r
     </channel>\r
+    <channel>\r
+      <videomode>1080i5000</videomode>\r
+      <consumers>\r
+        <audio/>\r
+        <ogl>\r
+          <device>1</device>\r
+          <stretch>uniform</stretch>\r
+          <windowed>true</windowed>\r
+          <output>key_only</output>\r
+        </ogl>\r
+      </consumers>\r
+    </channel>\r
 </channels>\r
   <controllers>\r
     <tcp>\r
index 018788706651bf98bb5be2565cb2cec1d0220381..48bf5e2263bbb21269d88616459229ef0a214703 100644 (file)
@@ -194,7 +194,19 @@ int main(int argc, wchar_t* argv[])
                                wcmd = L"CG 1-2 ADD 1 BBTELEFONARE 1";\r
                        else if(wcmd.substr(0, 1) == L"X")\r
                        {\r
-                               int num = boost::lexical_cast<int>(wcmd.substr(1, 2));\r
+                               int num = 0;\r
+                               std::wstring file;\r
+                               try\r
+                               {\r
+                                       num = boost::lexical_cast<int>(wcmd.substr(1, 2));\r
+                                       file = wcmd.substr(4, wcmd.length()-1);\r
+                               }\r
+                               catch(...)\r
+                               {\r
+                                       num = boost::lexical_cast<int>(wcmd.substr(1, 1));\r
+                                       file = wcmd.substr(3, wcmd.length()-1);\r
+                               }\r
+\r
                                int n = 0;\r
                                int num2 = num;\r
                                while(num2 > 0)\r
@@ -203,7 +215,6 @@ int main(int argc, wchar_t* argv[])
                                        n++;\r
                                }\r
 \r
-                               auto file = wcmd.substr(4, wcmd.length()-1);\r
                                wcmd = L"MIXER 1 VIDEO GRID " + boost::lexical_cast<std::wstring>(n);\r
 \r
                                for(int i = 1; i <= num; ++i)\r