<ItemGroup>\r
<ClInclude Include="assert.h" />\r
<ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
- <ClInclude Include="concurrency\defer.h" />\r
+ <ClInclude Include="concurrency\async.h" />\r
<ClInclude Include="concurrency\executor.h" />\r
<ClInclude Include="concurrency\lock.h" />\r
<ClInclude Include="diagnostics\graph.h" />\r
<ClInclude Include="reactive.h">\r
<Filter>source</Filter>\r
</ClInclude>\r
- <ClInclude Include="concurrency\defer.h">\r
- <Filter>source\concurrency</Filter>\r
- </ClInclude>\r
<ClInclude Include="except.h">\r
<Filter>source</Filter>\r
</ClInclude>\r
<ClInclude Include="tweener.h">\r
<Filter>source</Filter>\r
</ClInclude>\r
+ <ClInclude Include="concurrency\async.h">\r
+ <Filter>source\concurrency</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
--- /dev/null
+#pragma once\r
+\r
+#include <functional>\r
+#include <memory>\r
+\r
+#include <boost/thread/future.hpp>\r
+#include <boost/thread/thread.hpp>\r
+\r
+#include <boost/utility/declval.hpp>\r
+\r
+#include "../enum_class.h"\r
+\r
+namespace caspar {\r
+\r
+struct launch_policy_def\r
+{\r
+ enum type\r
+ {\r
+ async,\r
+ deferred\r
+ };\r
+};\r
+typedef enum_class<launch_policy_def> launch_policy;\r
+\r
+namespace detail {\r
+\r
+template<typename R>\r
+struct invoke_function\r
+{ \r
+ template<typename F>\r
+ void operator()(boost::promise<R>& p, F& f)\r
+ {\r
+ p.set_value(f());\r
+ }\r
+};\r
+\r
+template<>\r
+struct invoke_function<void>\r
+{ \r
+ template<typename F>\r
+ void operator()(boost::promise<void>& p, F& f)\r
+ {\r
+ f();\r
+ p.set_value();\r
+ }\r
+};\r
+\r
+}\r
+ \r
+template<typename F>\r
+auto async(launch_policy lp, F&& f) -> boost::unique_future<decltype(f())>\r
+{ \r
+ typedef decltype(f()) result_type;\r
+\r
+ if(lp == launch_policy::deferred)\r
+ {\r
+ typedef boost::promise<result_type> promise_t;\r
+\r
+ auto promise = new promise_t();\r
+ auto future = promise->get_future();\r
+ \r
+ promise->set_wait_callback(std::function<void(promise_t&)>([=](promise_t&) mutable\r
+ {\r
+ std::unique_ptr<promise_t> pointer_guard(promise);\r
+ detail::invoke_function<result_type>()(*promise, f);\r
+ }));\r
+\r
+ return std::move(future);\r
+ }\r
+ else\r
+ {\r
+ typedef boost::packaged_task<result_type> packaged_task_t;\r
+\r
+ auto task = packaged_task_t(f); \r
+ auto future = task.get_future();\r
+\r
+ boost::thread(std::move(task)).detach();\r
+\r
+ return std::move(future);\r
+ }\r
+}\r
+ \r
+template<typename F>\r
+auto async(F&& f) -> boost::unique_future<decltype(f())>\r
+{ \r
+ return async(launch_policy::async, std::forward<F>(f));\r
+}\r
+\r
+}
\ No newline at end of file
+++ /dev/null
-#pragma once\r
-\r
-#include <functional>\r
-#include <memory>\r
-\r
-#include <boost/thread/future.hpp>\r
-\r
-namespace caspar {\r
- \r
-template<typename F>\r
-auto defer(F&& f) -> boost::unique_future<decltype(f())>\r
-{ \r
- typedef boost::promise<decltype(f())> promise_t;\r
- auto p = new promise_t();\r
-\r
- auto func = [=](promise_t&) mutable\r
- {\r
- std::unique_ptr<promise_t> guard(p);\r
- p->set_value(f());\r
- };\r
-\r
- p->set_wait_callback(std::function<void(promise_t&)>(func));\r
-\r
- return p->get_future();\r
-}\r
-\r
-\r
-}
\ No newline at end of file
\r
#include "log.h"\r
\r
+#include "except.h"\r
#include "utf.h"\r
\r
-#include "exception/exceptions.h"\r
-\r
#include <ios>\r
#include <string>\r
#include <ostream>\r
#include "../device_buffer.h"\r
\r
#include <common/gl/gl_check.h>\r
-#include <common/concurrency/defer.h>\r
+#include <common/concurrency/async.h>\r
\r
#include <core/frame/frame_transform.h>\r
#include <core/frame/pixel_format.h>\r
return result;\r
});\r
\r
- return defer([=]() mutable -> boost::iterator_range<const uint8_t*>\r
+ return async(launch_policy::deferred, [=]() mutable -> boost::iterator_range<const uint8_t*>\r
{\r
auto ptr = reinterpret_cast<const uint8_t*>(buffer.get()->data()); // .get() and ->data() can block calling thread, ->data() can also block OpenGL thread, defer it as long as possible.\r
return boost::iterator_range<const uint8_t*>(ptr, ptr + buffer.get()->size());\r
#include "color/color_producer.h"\r
#include "separated/separated_producer.h"\r
\r
-#include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/executor.h>\r
+#include <common/assert.h>\r
#include <common/except.h>\r
-#include <common/utility/move_on_copy.h>\r
+#include <common/concurrency/executor.h>\r
+#include <common/concurrency/async.h>\r
+#include <common/memory/safe_ptr.h>\r
\r
namespace caspar { namespace core {\r
\r
\r
~destroy_producer_proxy()\r
{ \r
- static auto destroyers = std::make_shared<tbb::concurrent_bounded_queue<std::shared_ptr<executor>>>();\r
- static tbb::atomic<int> destroyer_count;\r
+ static tbb::atomic<int> counter = tbb::atomic<int>();\r
\r
try\r
- {\r
- std::shared_ptr<executor> destroyer;\r
- if(!destroyers->try_pop(destroyer))\r
- {\r
- destroyer.reset(new executor(L"destroyer"));\r
- destroyer->set_priority_class(below_normal_priority_class);\r
- if(++destroyer_count > 16)\r
- CASPAR_LOG(warning) << L"Potential destroyer dead-lock detected.";\r
- CASPAR_LOG(trace) << "Created destroyer: " << destroyer_count;\r
- }\r
- \r
+ { \r
auto producer = producer_.release();\r
- auto pool = destroyers;\r
- destroyer->begin_invoke([=]\r
+ ++counter;\r
+ CASPAR_VERIFY(counter < 32);\r
+\r
+ async([=]\r
{\r
std::unique_ptr<std::shared_ptr<frame_producer>> producer2(producer);\r
\r
catch(...){}\r
\r
producer2.reset();\r
- pool->push(destroyer);\r
+\r
+ --counter;\r
}); \r
}\r
catch(...)\r
return make_safe<print_producer_proxy>(std::move(producer));\r
}\r
\r
-class last_frame_producer : public frame_producer\r
-{\r
- const std::wstring print_;\r
- const safe_ptr<draw_frame> frame_;\r
- const uint32_t nb_frames_;\r
-public:\r
- last_frame_producer(const safe_ptr<frame_producer>& producer) \r
- : print_(producer->print())\r
- , frame_(producer->last_frame() != draw_frame::eof() ? producer->last_frame() : draw_frame::empty())\r
- , nb_frames_(producer->nb_frames())\r
- {\r
- }\r
- \r
- virtual safe_ptr<draw_frame> receive(int){return frame_;}\r
- virtual safe_ptr<core::draw_frame> last_frame() const{return frame_;}\r
- virtual std::wstring print() const{return L"dummy[" + print_ + L"]";}\r
- virtual uint32_t nb_frames() const {return nb_frames_;} \r
- virtual boost::property_tree::wptree info() const override\r
- {\r
- boost::property_tree::wptree info;\r
- info.add(L"type", L"last-frame-producer");\r
- return info;\r
- }\r
-};\r
-\r
struct empty_frame_producer : public frame_producer\r
{\r
virtual safe_ptr<draw_frame> receive(int){return draw_frame::empty();}\r
following->set_leading_producer(producer);\r
producer = std::move(following);\r
}\r
- else\r
- producer = make_safe<last_frame_producer>(producer);\r
\r
return receive_and_follow(producer, hints);\r
}\r
};\r
typedef enum_class<flags_def> flags;\r
\r
- frame_producer(){}\r
virtual ~frame_producer(){} \r
\r
virtual std::wstring print() const = 0; // nothrow\r
is_paused_ = false;\r
}\r
\r
- void load(const safe_ptr<frame_producer>& producer, const boost::optional<int32_t>& auto_play_delta)\r
+ void load(safe_ptr<frame_producer> producer, const boost::optional<int32_t>& auto_play_delta)\r
{ \r
- background_ = producer;\r
+ background_ = std::move(producer);\r
auto_play_delta_ = auto_play_delta;\r
\r
if(auto_play_delta_ && foreground_ == frame_producer::empty())\r
if(background_ != frame_producer::empty())\r
{\r
background_->set_leading_producer(foreground_);\r
- \r
- foreground_ = background_;\r
- background_ = frame_producer::empty();\r
- frame_number_ = 0;\r
+ foreground_ = background_;\r
+ background_ = frame_producer::empty();\r
+ frame_number_ = 0;\r
auto_play_delta_.reset();\r
}\r
\r
- is_paused_ = false;\r
+ resume();\r
}\r
\r
void stop()\r
{\r
- foreground_ = frame_producer::empty();\r
- background_ = background_;\r
- frame_number_ = 0;\r
+ foreground_ = frame_producer::empty();\r
+ frame_number_ = 0;\r
auto_play_delta_.reset();\r
\r
- is_paused_ = true;\r
+ pause();\r
}\r
\r
safe_ptr<draw_frame> receive(frame_producer::flags flags)\r
}\r
}\r
\r
- bool empty() const\r
- {\r
- return background_ == core::frame_producer::empty() && foreground_ == core::frame_producer::empty();\r
- }\r
-\r
boost::property_tree::wptree info() const\r
{\r
boost::property_tree::wptree info;\r
{ \r
impl_.swap(other.impl_);\r
}\r
-void layer::load(const safe_ptr<frame_producer>& frame_producer, const boost::optional<int32_t>& auto_play_delta){return impl_->load(frame_producer, auto_play_delta);} \r
+void layer::load(safe_ptr<frame_producer> frame_producer, const boost::optional<int32_t>& auto_play_delta){return impl_->load(std::move(frame_producer), auto_play_delta);} \r
void layer::play(){impl_->play();}\r
void layer::pause(){impl_->pause();}\r
void layer::stop(){impl_->stop();}\r
safe_ptr<draw_frame> layer::receive(frame_producer::flags flags) {return impl_->receive(flags);}\r
safe_ptr<frame_producer> layer::foreground() const { return impl_->foreground_;}\r
safe_ptr<frame_producer> layer::background() const { return impl_->background_;}\r
-bool layer::empty() const {return impl_->empty();}\r
boost::property_tree::wptree layer::info() const{return impl_->info();}\r
}}
\ No newline at end of file
\r
void swap(layer& other); // nothrow \r
\r
- void load(const safe_ptr<struct frame_producer>& producer, const boost::optional<int32_t>& auto_play_delta = nullptr); // nothrow\r
+ void load(safe_ptr<struct frame_producer> producer, const boost::optional<int32_t>& auto_play_delta = nullptr); // nothrow\r
void play(); // nothrow\r
void pause(); // nothrow\r
void stop(); // nothrow\r
\r
- bool empty() const;\r
-\r
safe_ptr<struct frame_producer> foreground() const; // nothrow\r
safe_ptr<struct frame_producer> background() const; // nothrow\r
\r
\r
boost::unique_future<boost::property_tree::wptree> info(int index)\r
{\r
- return executor_.begin_invoke([=]() -> boost::property_tree::wptree\r
+ return executor_.begin_invoke([=]\r
{\r
return layers_[index].info();\r
}, high_priority);\r
if(format_desc.format == core::video_format::invalid)\r
format_desc = frame_factory->get_video_format_desc();\r
\r
- return create_producer_print_proxy(\r
- create_producer_destroy_proxy(\r
+ return create_producer_destroy_proxy(\r
+ create_producer_print_proxy(\r
make_safe<decklink_producer_proxy>(frame_factory, format_desc, device_index, filter_str, length)));\r
}\r
\r
boost::replace_all(filter_str, L"DEINTERLACE", L"YADIF=0:-1");\r
boost::replace_all(filter_str, L"DEINTERLACE_BOB", L"YADIF=1:-1");\r
\r
- return create_producer_destroy_proxy(make_safe<ffmpeg_producer>(frame_factory, filename, filter_str, loop, start, length));\r
+ return create_producer_destroy_proxy(\r
+ create_producer_print_proxy(\r
+ make_safe<ffmpeg_producer>(frame_factory, filename, filter_str, loop, start, length)));\r
}\r
\r
}}
\ No newline at end of file