]> git.sesse.net Git - casparcg/commitdiff
Fixed a deadlock problem in executor.cpp.
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Wed, 15 Sep 2010 10:35:35 +0000 (10:35 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Wed, 15 Sep 2010 10:35:35 +0000 (10:35 +0000)
Updated flashproducer.

git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0.0.0@120 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

common/concurrency/executor.h
common/exception/exceptions.h
server/producer/flash/flash_producer.cpp
server/producer/flash/flash_producer.h

index 5fb6ef521d39c2dc4d678351a0b31dbecc7bbe89..8d22e8e64243f196c451e8ae532d0fe2ee2b0061 100644 (file)
@@ -1,5 +1,7 @@
 #pragma once\r
 \r
+#include "../exception/exceptions.h"\r
+\r
 #include <boost/thread.hpp>\r
 \r
 #include <tbb/atomic.h>\r
@@ -12,11 +14,9 @@ namespace caspar { namespace common {
 class executor\r
 {\r
 public:\r
-       executor(const std::function<void()>& run_func = nullptr) : run_func_(run_func)\r
+       explicit executor(const std::function<void()>& run_func = nullptr) : is_running_()\r
        {\r
-               is_running_ = false;\r
-               if(run_func_ == nullptr) \r
-                       run_func_ = [=]{default_run();};\r
+               run_func_ = run_func != nullptr ? run_func : [=]{run();};\r
        }\r
 \r
        virtual ~executor()\r
@@ -36,44 +36,55 @@ public:
                return is_running_;\r
        }\r
 \r
-       bool stop(unsigned int timeout = 5000)\r
+       void stop()\r
        {\r
                if(is_running_.fetch_and_store(false))\r
                {\r
-                       execution_queue_.push([](){});\r
-                       execute(false);\r
+                       execution_queue_.clear();\r
+                       execution_queue_.push([](){});                  \r
                }\r
-               return thread_.timed_join(boost::posix_time::milliseconds(timeout));\r
+               thread_.join();\r
        }\r
 \r
-       void execute(bool block = false)\r
+       void execute()\r
        {\r
                std::function<void()> func;\r
-               if(block)               \r
-               {\r
-                       execution_queue_.pop(func);     \r
+               execution_queue_.pop(func);     \r
+               func();\r
+       }\r
+\r
+       bool try_execute()\r
+       {\r
+               std::function<void()> func;\r
+               if(execution_queue_.try_pop(func))\r
                        func();\r
-               }\r
 \r
-               while(execution_queue_.try_pop(func))\r
-                       func();         \r
+               return func != nullptr;\r
+       }\r
+\r
+       void clear()\r
+       {\r
+               execution_queue_.clear();\r
+       }\r
+\r
+       template<typename Func>\r
+       void enqueue(Func&& func)\r
+       {\r
+               execution_queue_.push([=]{try{func();}catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}});\r
        }\r
 \r
        template<typename Func>\r
        auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())>\r
        {       \r
                typedef decltype(func()) result_type; \r
-\r
-               if(!is_running_)\r
-                       return boost::packaged_task<result_type>([]{ return result_type(); }).get_future();\r
-\r
-               if(boost::this_thread::get_id() == thread_.get_id())\r
-                       return boost::packaged_task<result_type>([=]{ return func(); }).get_future();\r
-\r
-               auto task = std::make_shared<boost::packaged_task<result_type>>([=]{ return is_running_ ? func() : result_type(); });   \r
+                               \r
+               auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func));      \r
                auto future = task->get_future();\r
-\r
-               execution_queue_.push([=]{(*task)();});\r
+               \r
+               if(boost::this_thread::get_id() != thread_.get_id())\r
+                       execution_queue_.push([=]{(*task)();});\r
+               else\r
+                       (*task)();\r
 \r
                return std::move(future);               \r
        }\r
@@ -86,10 +97,10 @@ public:
        \r
 private:\r
 \r
-       void default_run() throw()\r
+       virtual void run()\r
        {\r
                while(is_running_)\r
-                       execute(true);\r
+                       execute();\r
        }\r
 \r
        std::function<void()> run_func_;\r
index f81878993b8909ef7d385c97ee56a2fe7497479e..18127c772381af9033a1b5a309bb127fe6c95074 100644 (file)
@@ -9,6 +9,8 @@ namespace caspar {
 \r
 typedef boost::error_info<struct tag_arg_name_info, std::string> arg_name_info;\r
 typedef boost::error_info<struct tag_arg_value_info, std::string> arg_value_info;\r
+typedef boost::error_info<struct tag_arg_name_info, std::wstring> warg_name_info;\r
+typedef boost::error_info<struct tag_arg_value_info, std::wstring> warg_value_info;\r
 typedef boost::error_info<struct tag_msg_info, std::string> msg_info;\r
 typedef boost::error_info<struct tag_inner_info, std::exception_ptr> inner_info;\r
 typedef boost::error_info<struct tag_line_info, int> line_info;\r
@@ -25,6 +27,7 @@ struct null_argument                  : virtual invalid_argument {};
 struct out_of_range                            : virtual invalid_argument {};\r
 \r
 struct invalid_operation               : virtual caspar_exception {};\r
+struct operation_failed                        : virtual caspar_exception {};\r
 \r
 struct not_supported                   : virtual caspar_exception {};\r
 struct not_implemented                 : virtual caspar_exception {};\r
index 3e1c648019ba1928d5e6a8e249c788b694e5769e..c74efd303b36e923dd7e13f79eb6fb28350b20bb 100644 (file)
@@ -45,6 +45,8 @@
 #include <tbb/atomic.h>\r
 #include <tbb/concurrent_queue.h>\r
 \r
+#include <type_traits>\r
+\r
 namespace caspar { namespace flash {\r
 \r
 using namespace boost::assign;\r
@@ -56,12 +58,11 @@ extern __declspec(selectany) CAtlModule* _pAtlModule = &_AtlModule;
 struct flash_producer::implementation\r
 {      \r
        implementation(flash_producer* self, const std::wstring& filename, const frame_format_desc& format_desc, Monitor* monitor) \r
-               : flashax_container_(nullptr), monitor_(monitor),       filename_(filename), self_(self), format_desc_(format_desc),\r
+               : flashax_container_(nullptr), filename_(filename), self_(self), format_desc_(format_desc), monitor_(monitor),\r
                        bitmap_pool_(new bitmap_pool), executor_([=]{run();})\r
        {       \r
        if(!boost::filesystem::exists(filename))\r
-               BOOST_THROW_EXCEPTION(file_not_found() << \r
-                                                                       boost::errinfo_file_name(common::narrow(filename)));\r
+               BOOST_THROW_EXCEPTION(file_not_found() << boost::errinfo_file_name(common::narrow(filename)));\r
 \r
                frame_buffer_.set_capacity(flash_producer::DEFAULT_BUFFER_SIZE);\r
                last_frame_ = std::make_shared<bitmap_frame>(format_desc_.width, format_desc_.height);\r
@@ -69,6 +70,11 @@ struct flash_producer::implementation
                start();\r
        }\r
 \r
+       ~implementation() \r
+       {\r
+               stop();\r
+       }\r
+\r
        void start()\r
        {               \r
                try\r
@@ -80,37 +86,30 @@ struct flash_producer::implementation
                        {\r
                                if(FAILED(CComObject<FlashAxContainer>::CreateInstance(&flashax_container_)) || \r
                                                        flashax_container_ == nullptr)\r
-                                       BOOST_THROW_EXCEPTION(caspar_exception()\r
-                                                                                       << msg_info("Failed to create FlashAxContainer"));\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to create FlashAxContainer"));\r
                \r
                                flashax_container_->pflash_producer_ = self_;\r
                                CComPtr<IShockwaveFlash> spFlash;\r
 \r
                                if(FAILED(flashax_container_->CreateAxControl()))\r
-                                       BOOST_THROW_EXCEPTION(caspar_exception() \r
-                                                                                       << msg_info("Failed to Create FlashAxControl"));\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Create FlashAxControl"));\r
 \r
                                if(FAILED(flashax_container_->QueryControl(&spFlash)))\r
-                                       BOOST_THROW_EXCEPTION(caspar_exception() \r
-                                                                                       << msg_info("Failed to Query FlashAxControl"));\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Query FlashAxControl"));\r
                                                                                                \r
                                if(FAILED(spFlash->put_Playing(true)) )\r
-                                       BOOST_THROW_EXCEPTION(caspar_exception() \r
-                                                                                       << msg_info("Failed to start playing Flash"));\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to start playing Flash"));\r
 \r
                                if(FAILED(spFlash->put_Movie(CComBSTR(filename_.c_str()))))\r
-                                       BOOST_THROW_EXCEPTION(caspar_exception() \r
-                                                                                       << msg_info("Failed to Load Template Host"));\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Load Template Host"));\r
                                                \r
                                //Exact fit. Scale without respect to the aspect ratio.\r
                                if(FAILED(spFlash->put_ScaleMode(2))) \r
-                                       BOOST_THROW_EXCEPTION(caspar_exception() \r
-                                                                                       << msg_info("Failed to Set Scale Mode"));\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Set Scale Mode"));\r
                                                                                                \r
                                // stop if failed\r
                                if(FAILED(flashax_container_->SetFormat(format_desc_))) \r
-                                       BOOST_THROW_EXCEPTION(caspar_exception() \r
-                                                                                       << msg_info("Failed to Set Format"));\r
+                                       BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Set Format"));\r
 \r
                                current_frame_ = nullptr; // Force re-render of current frame                   \r
                        });\r
@@ -123,69 +122,43 @@ struct flash_producer::implementation
                }\r
        }\r
 \r
-       ~implementation() \r
-       {\r
-               stop();\r
-       }\r
-\r
        void stop()\r
        {\r
                is_empty_ = true;\r
-               try \r
-               {\r
-                       if(executor_.is_running())\r
-                       {\r
-                               frame_buffer_.clear();\r
-                               if(!executor_.stop(flash_producer::STOP_TIMEOUT)) // Could be interrupted\r
-                                       CASPAR_LOG(warning) << "Timed-out. Continuing execution.";\r
-                       }\r
-               }\r
-               catch(...)\r
+               if(executor_.is_running())\r
                {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+                       frame_buffer_.clear();\r
+                       executor_.stop();\r
                }\r
        }\r
 \r
-       bool param(const std::wstring& param) \r
-       {               \r
-               try\r
+       void param(const std::wstring& param) \r
+       {       \r
+               if(!executor_.is_running())\r
                {\r
-                       if(!executor_.is_running())\r
+                       try\r
                        {\r
-                               try\r
-                               {\r
-                                       start();\r
-                               }\r
-                               catch(caspar_exception& e)\r
-                               {\r
-                                       e << msg_info("Failed to restart");\r
-                                       throw e;\r
-                               }\r
+                               start();\r
+                       }\r
+                       catch(caspar_exception& e)\r
+                       {\r
+                               e << msg_info("Flashproducer failed to recover from failure.");\r
+                               throw e;\r
                        }\r
-\r
-                       executor_.invoke([&]\r
-                       {       \r
-                               is_empty_ = false;\r
-                               for(int retries = 0; retries < flash_producer::MAX_PARAM_RETRIES; ++retries)\r
-                               {\r
-                                       if(retries > 0)\r
-                                               CASPAR_LOG(debug) << "Retrying. Count: " << retries;\r
-\r
-                                       retries = flashax_container_->CallFunction(param) ? \r
-                                                               flash_producer::MAX_PARAM_RETRIES : retries;\r
-                               }\r
-                       });\r
-                       return true;\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
-               \r
-               return false;\r
-       }\r
-\r
 \r
+               executor_.invoke([&]()\r
+               {                       \r
+                       for(size_t retries = 0; !flashax_container_->CallFunction(param); ++retries)\r
+                       {\r
+                               CASPAR_LOG(debug) << "Retrying. Count: " << retries;\r
+                               if(retries > 3)\r
+                                       BOOST_THROW_EXCEPTION(operation_failed() << warg_name_info(L"param") << warg_value_info(param));\r
+                       }\r
+                       is_empty_ = false;      \r
+               });\r
+       }\r
+       \r
        void run()\r
        {\r
                win32_exception::install_handler();\r
@@ -221,10 +194,13 @@ struct flash_producer::implementation
                                if(!is_empty_)\r
                                {\r
                                        render();       \r
-                                       executor_.execute(false);\r
+                                       while(executor_.try_execute()){}\r
                                }\r
                                else\r
-                                       executor_.execute(true);\r
+                               {\r
+                                       executor_.execute();\r
+                                       while(executor_.try_execute()){}\r
+                               }\r
                        }\r
                }\r
                catch(...)\r
@@ -273,7 +249,17 @@ struct flash_producer::implementation
                        flashax_container_->DrawControl(frame->hdc());\r
 \r
                        auto pool = bitmap_pool_;\r
-                       current_frame_.reset(frame.get(), [=](bitmap_frame*){common::function_task::enqueue([=]{try{pool->try_push(clear_frame(frame));}catch(...){}});});\r
+                       current_frame_.reset(frame.get(), [=](bitmap_frame*)\r
+                       {\r
+                               common::function_task::enqueue([=]\r
+                               {\r
+                                       try\r
+                                       {\r
+                                               pool->try_push(clear_frame(frame));\r
+                                       }\r
+                                       catch(...){}\r
+                               });\r
+                       });\r
                }       \r
                return current_frame_;\r
        }\r
@@ -282,7 +268,7 @@ struct flash_producer::implementation
        {\r
                return frame_buffer_.try_pop(last_frame_) || !is_empty_ ? last_frame_ : frame::null();\r
        }\r
-\r
+       \r
        typedef tbb::concurrent_bounded_queue<bitmap_frame_ptr> bitmap_pool;\r
        std::shared_ptr<bitmap_pool> bitmap_pool_;\r
        frame_format_desc format_desc_;\r
@@ -292,24 +278,19 @@ struct flash_producer::implementation
        tbb::concurrent_bounded_queue<frame_ptr> frame_buffer_;\r
        frame_ptr last_frame_;\r
        frame_ptr current_frame_;\r
-\r
-       Monitor* monitor_;\r
-\r
+       \r
        std::wstring filename_;\r
        flash_producer* self_;\r
+       Monitor* monitor_;\r
 \r
        tbb::atomic<bool> is_empty_;\r
        common::executor executor_;\r
 };\r
 \r
-flash_producer::flash_producer(const std::wstring& filename, const frame_format_desc& format_desc, Monitor* monitor) \r
-       : impl_(new implementation(this, filename, format_desc, monitor))\r
-{      \r
-}\r
-\r
+flash_producer::flash_producer(const std::wstring& filename, const frame_format_desc& format_desc, Monitor* monitor) : impl_(new implementation(this, filename, format_desc, monitor)){}\r
 frame_ptr flash_producer::get_frame(){return impl_->get_frame();}\r
-bool flash_producer::param(const std::wstring& param){return impl_->param(param);}\r
-Monitor* flash_producer::get_monitor(){return impl_->monitor_;}\r
+Monitor* flash_producer::get_monitor(){return impl_->monitor_; }\r
+void flash_producer::param(const std::wstring& param){impl_->param(param);}\r
 const frame_format_desc& flash_producer::get_frame_format_desc() const { return impl_->format_desc_; } \r
 \r
 std::wstring flash_producer::find_template(const std::wstring& template_name)\r
@@ -323,8 +304,7 @@ std::wstring flash_producer::find_template(const std::wstring& template_name)
        return TEXT("");\r
 }\r
 \r
-flash_producer_ptr create_flash_producer(const std::vector<std::wstring>& params, \r
-                                                                               const frame_format_desc& format_desc)\r
+flash_producer_ptr create_flash_producer(const std::vector<std::wstring>& params, const frame_format_desc& format_desc)\r
 {\r
        // TODO: Check for flash support\r
        auto filename = params[0];\r
index 80c132f9ae3d853eb16d5f5e03526ef178493c18..0a815af5cd7e67980b80c1b27957772c7562b6e1 100644 (file)
@@ -46,7 +46,7 @@ public:
        frame_ptr get_frame();\r
        const frame_format_desc& get_frame_format_desc() const;\r
 \r
-       bool param(const std::wstring& param);\r
+       void param(const std::wstring& param);\r
        \r
        static std::wstring find_template(const std::wstring& templateName);\r
 \r