#pragma once\r
\r
+#include "../exception/exceptions.h"\r
+\r
#include <boost/thread.hpp>\r
\r
#include <tbb/atomic.h>\r
class executor\r
{\r
public:\r
- executor(const std::function<void()>& run_func = nullptr) : run_func_(run_func)\r
+ explicit executor(const std::function<void()>& run_func = nullptr) : is_running_()\r
{\r
- is_running_ = false;\r
- if(run_func_ == nullptr) \r
- run_func_ = [=]{default_run();};\r
+ run_func_ = run_func != nullptr ? run_func : [=]{run();};\r
}\r
\r
virtual ~executor()\r
return is_running_;\r
}\r
\r
- bool stop(unsigned int timeout = 5000)\r
+ void stop()\r
{\r
if(is_running_.fetch_and_store(false))\r
{\r
- execution_queue_.push([](){});\r
- execute(false);\r
+ execution_queue_.clear();\r
+ execution_queue_.push([](){}); \r
}\r
- return thread_.timed_join(boost::posix_time::milliseconds(timeout));\r
+ thread_.join();\r
}\r
\r
- void execute(bool block = false)\r
+ void execute()\r
{\r
std::function<void()> func;\r
- if(block) \r
- {\r
- execution_queue_.pop(func); \r
+ execution_queue_.pop(func); \r
+ func();\r
+ }\r
+\r
+ bool try_execute()\r
+ {\r
+ std::function<void()> func;\r
+ if(execution_queue_.try_pop(func))\r
func();\r
- }\r
\r
- while(execution_queue_.try_pop(func))\r
- func(); \r
+ return func != nullptr;\r
+ }\r
+\r
+ void clear()\r
+ {\r
+ execution_queue_.clear();\r
+ }\r
+\r
+ template<typename Func>\r
+ void enqueue(Func&& func)\r
+ {\r
+ execution_queue_.push([=]{try{func();}catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}});\r
}\r
\r
template<typename Func>\r
auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())>\r
{ \r
typedef decltype(func()) result_type; \r
-\r
- if(!is_running_)\r
- return boost::packaged_task<result_type>([]{ return result_type(); }).get_future();\r
-\r
- if(boost::this_thread::get_id() == thread_.get_id())\r
- return boost::packaged_task<result_type>([=]{ return func(); }).get_future();\r
-\r
- auto task = std::make_shared<boost::packaged_task<result_type>>([=]{ return is_running_ ? func() : result_type(); }); \r
+ \r
+ auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); \r
auto future = task->get_future();\r
-\r
- execution_queue_.push([=]{(*task)();});\r
+ \r
+ if(boost::this_thread::get_id() != thread_.get_id())\r
+ execution_queue_.push([=]{(*task)();});\r
+ else\r
+ (*task)();\r
\r
return std::move(future); \r
}\r
\r
private:\r
\r
- void default_run() throw()\r
+ virtual void run()\r
{\r
while(is_running_)\r
- execute(true);\r
+ execute();\r
}\r
\r
std::function<void()> run_func_;\r
#include <tbb/atomic.h>\r
#include <tbb/concurrent_queue.h>\r
\r
+#include <type_traits>\r
+\r
namespace caspar { namespace flash {\r
\r
using namespace boost::assign;\r
struct flash_producer::implementation\r
{ \r
implementation(flash_producer* self, const std::wstring& filename, const frame_format_desc& format_desc, Monitor* monitor) \r
- : flashax_container_(nullptr), monitor_(monitor), filename_(filename), self_(self), format_desc_(format_desc),\r
+ : flashax_container_(nullptr), filename_(filename), self_(self), format_desc_(format_desc), monitor_(monitor),\r
bitmap_pool_(new bitmap_pool), executor_([=]{run();})\r
{ \r
if(!boost::filesystem::exists(filename))\r
- BOOST_THROW_EXCEPTION(file_not_found() << \r
- boost::errinfo_file_name(common::narrow(filename)));\r
+ BOOST_THROW_EXCEPTION(file_not_found() << boost::errinfo_file_name(common::narrow(filename)));\r
\r
frame_buffer_.set_capacity(flash_producer::DEFAULT_BUFFER_SIZE);\r
last_frame_ = std::make_shared<bitmap_frame>(format_desc_.width, format_desc_.height);\r
start();\r
}\r
\r
+ ~implementation() \r
+ {\r
+ stop();\r
+ }\r
+\r
void start()\r
{ \r
try\r
{\r
if(FAILED(CComObject<FlashAxContainer>::CreateInstance(&flashax_container_)) || \r
flashax_container_ == nullptr)\r
- BOOST_THROW_EXCEPTION(caspar_exception()\r
- << msg_info("Failed to create FlashAxContainer"));\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to create FlashAxContainer"));\r
\r
flashax_container_->pflash_producer_ = self_;\r
CComPtr<IShockwaveFlash> spFlash;\r
\r
if(FAILED(flashax_container_->CreateAxControl()))\r
- BOOST_THROW_EXCEPTION(caspar_exception() \r
- << msg_info("Failed to Create FlashAxControl"));\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Create FlashAxControl"));\r
\r
if(FAILED(flashax_container_->QueryControl(&spFlash)))\r
- BOOST_THROW_EXCEPTION(caspar_exception() \r
- << msg_info("Failed to Query FlashAxControl"));\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Query FlashAxControl"));\r
\r
if(FAILED(spFlash->put_Playing(true)) )\r
- BOOST_THROW_EXCEPTION(caspar_exception() \r
- << msg_info("Failed to start playing Flash"));\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to start playing Flash"));\r
\r
if(FAILED(spFlash->put_Movie(CComBSTR(filename_.c_str()))))\r
- BOOST_THROW_EXCEPTION(caspar_exception() \r
- << msg_info("Failed to Load Template Host"));\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Load Template Host"));\r
\r
//Exact fit. Scale without respect to the aspect ratio.\r
if(FAILED(spFlash->put_ScaleMode(2))) \r
- BOOST_THROW_EXCEPTION(caspar_exception() \r
- << msg_info("Failed to Set Scale Mode"));\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Set Scale Mode"));\r
\r
// stop if failed\r
if(FAILED(flashax_container_->SetFormat(format_desc_))) \r
- BOOST_THROW_EXCEPTION(caspar_exception() \r
- << msg_info("Failed to Set Format"));\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Failed to Set Format"));\r
\r
current_frame_ = nullptr; // Force re-render of current frame \r
});\r
}\r
}\r
\r
- ~implementation() \r
- {\r
- stop();\r
- }\r
-\r
void stop()\r
{\r
is_empty_ = true;\r
- try \r
- {\r
- if(executor_.is_running())\r
- {\r
- frame_buffer_.clear();\r
- if(!executor_.stop(flash_producer::STOP_TIMEOUT)) // Could be interrupted\r
- CASPAR_LOG(warning) << "Timed-out. Continuing execution.";\r
- }\r
- }\r
- catch(...)\r
+ if(executor_.is_running())\r
{\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
+ frame_buffer_.clear();\r
+ executor_.stop();\r
}\r
}\r
\r
- bool param(const std::wstring& param) \r
- { \r
- try\r
+ void param(const std::wstring& param) \r
+ { \r
+ if(!executor_.is_running())\r
{\r
- if(!executor_.is_running())\r
+ try\r
{\r
- try\r
- {\r
- start();\r
- }\r
- catch(caspar_exception& e)\r
- {\r
- e << msg_info("Failed to restart");\r
- throw e;\r
- }\r
+ start();\r
+ }\r
+ catch(caspar_exception& e)\r
+ {\r
+ e << msg_info("Flashproducer failed to recover from failure.");\r
+ throw e;\r
}\r
-\r
- executor_.invoke([&]\r
- { \r
- is_empty_ = false;\r
- for(int retries = 0; retries < flash_producer::MAX_PARAM_RETRIES; ++retries)\r
- {\r
- if(retries > 0)\r
- CASPAR_LOG(debug) << "Retrying. Count: " << retries;\r
-\r
- retries = flashax_container_->CallFunction(param) ? \r
- flash_producer::MAX_PARAM_RETRIES : retries;\r
- }\r
- });\r
- return true;\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
- \r
- return false;\r
- }\r
-\r
\r
+ executor_.invoke([&]()\r
+ { \r
+ for(size_t retries = 0; !flashax_container_->CallFunction(param); ++retries)\r
+ {\r
+ CASPAR_LOG(debug) << "Retrying. Count: " << retries;\r
+ if(retries > 3)\r
+ BOOST_THROW_EXCEPTION(operation_failed() << warg_name_info(L"param") << warg_value_info(param));\r
+ }\r
+ is_empty_ = false; \r
+ });\r
+ }\r
+ \r
void run()\r
{\r
win32_exception::install_handler();\r
if(!is_empty_)\r
{\r
render(); \r
- executor_.execute(false);\r
+ while(executor_.try_execute()){}\r
}\r
else\r
- executor_.execute(true);\r
+ {\r
+ executor_.execute();\r
+ while(executor_.try_execute()){}\r
+ }\r
}\r
}\r
catch(...)\r
flashax_container_->DrawControl(frame->hdc());\r
\r
auto pool = bitmap_pool_;\r
- current_frame_.reset(frame.get(), [=](bitmap_frame*){common::function_task::enqueue([=]{try{pool->try_push(clear_frame(frame));}catch(...){}});});\r
+ current_frame_.reset(frame.get(), [=](bitmap_frame*)\r
+ {\r
+ common::function_task::enqueue([=]\r
+ {\r
+ try\r
+ {\r
+ pool->try_push(clear_frame(frame));\r
+ }\r
+ catch(...){}\r
+ });\r
+ });\r
} \r
return current_frame_;\r
}\r
{\r
return frame_buffer_.try_pop(last_frame_) || !is_empty_ ? last_frame_ : frame::null();\r
}\r
-\r
+ \r
typedef tbb::concurrent_bounded_queue<bitmap_frame_ptr> bitmap_pool;\r
std::shared_ptr<bitmap_pool> bitmap_pool_;\r
frame_format_desc format_desc_;\r
tbb::concurrent_bounded_queue<frame_ptr> frame_buffer_;\r
frame_ptr last_frame_;\r
frame_ptr current_frame_;\r
-\r
- Monitor* monitor_;\r
-\r
+ \r
std::wstring filename_;\r
flash_producer* self_;\r
+ Monitor* monitor_;\r
\r
tbb::atomic<bool> is_empty_;\r
common::executor executor_;\r
};\r
\r
-flash_producer::flash_producer(const std::wstring& filename, const frame_format_desc& format_desc, Monitor* monitor) \r
- : impl_(new implementation(this, filename, format_desc, monitor))\r
-{ \r
-}\r
-\r
+flash_producer::flash_producer(const std::wstring& filename, const frame_format_desc& format_desc, Monitor* monitor) : impl_(new implementation(this, filename, format_desc, monitor)){}\r
frame_ptr flash_producer::get_frame(){return impl_->get_frame();}\r
-bool flash_producer::param(const std::wstring& param){return impl_->param(param);}\r
-Monitor* flash_producer::get_monitor(){return impl_->monitor_;}\r
+Monitor* flash_producer::get_monitor(){return impl_->monitor_; }\r
+void flash_producer::param(const std::wstring& param){impl_->param(param);}\r
const frame_format_desc& flash_producer::get_frame_format_desc() const { return impl_->format_desc_; } \r
\r
std::wstring flash_producer::find_template(const std::wstring& template_name)\r
return TEXT("");\r
}\r
\r
-flash_producer_ptr create_flash_producer(const std::vector<std::wstring>& params, \r
- const frame_format_desc& format_desc)\r
+flash_producer_ptr create_flash_producer(const std::vector<std::wstring>& params, const frame_format_desc& format_desc)\r
{\r
// TODO: Check for flash support\r
auto filename = params[0];\r