-#pragma once\r
-\r
-#include "enum_class.h"\r
-\r
-#include <boost/thread/future.hpp>\r
-#include <boost/thread/thread.hpp>\r
-#include <boost/shared_ptr.hpp>\r
-\r
-#include <functional>\r
-\r
-namespace caspar {\r
- \r
-struct launch_policy_def\r
-{\r
- enum type\r
- {\r
- async = 1,\r
- deferred = 2\r
- };\r
-};\r
-typedef caspar::enum_class<launch_policy_def> launch;\r
-\r
-namespace detail {\r
- \r
-template<typename R>\r
-struct future_object_helper\r
-{ \r
- template<typename T, typename F>\r
- static void nonlocking_invoke(T& future_object, F& f)\r
- { \r
- try\r
- {\r
- future_object.mark_finished_with_result_internal(f());\r
- }\r
- catch(...)\r
- {\r
- future_object.mark_exceptional_finish_internal(boost::current_exception());\r
- }\r
- }\r
-\r
- template<typename T, typename F>\r
- static void locking_invoke(T& future_object, F& f)\r
- { \r
- try\r
- {\r
- future_object.mark_finished_with_result(f());\r
- }\r
- catch(...)\r
- {\r
- future_object.mark_exceptional_finish();\r
- }\r
- }\r
-};\r
-\r
-template<>\r
-struct future_object_helper<void>\r
-{ \r
- template<typename T, typename F>\r
- static void nonlocking_invoke(T& future_object, F& f)\r
- { \r
- try\r
- {\r
- f();\r
- future_object.mark_finished_with_result_internal();\r
- }\r
- catch(...)\r
- {\r
- future_object.mark_exceptional_finish_internal(boost::current_exception());\r
- }\r
- }\r
-\r
- template<typename T, typename F>\r
- static void locking_invoke(T& future_object, F& f)\r
- { \r
- try\r
- {\r
- f();\r
- future_object.mark_finished_with_result();\r
- }\r
- catch(...)\r
- {\r
- future_object.mark_exceptional_finish();\r
- }\r
- }\r
-};\r
-\r
-template<typename R, typename F>\r
-struct deferred_future_object : public boost::detail::future_object<R>\r
-{ \r
- F f;\r
- bool done;\r
-\r
- template<typename F2>\r
- deferred_future_object(F2&& f)\r
- : f(std::forward<F2>(f))\r
- , done(false)\r
- {\r
- set_wait_callback(std::mem_fn(&detail::deferred_future_object<R, F>::operator()), this);\r
- }\r
- \r
- void operator()()\r
- { \r
- boost::lock_guard<boost::mutex> lock2(mutex);\r
-\r
- if(done)\r
- return;\r
-\r
- future_object_helper<R>::nonlocking_invoke(*this, f);\r
-\r
- done = true;\r
- }\r
-};\r
-\r
-template<typename R, typename F>\r
-struct async_future_object : public boost::detail::future_object<R>\r
-{ \r
- F f;\r
- boost::thread thread;\r
-\r
- template<typename F2>\r
- async_future_object(F2&& f)\r
- : f(std::forward<F2>(f))\r
- , thread([this]{run();})\r
- {\r
- }\r
-\r
- ~async_future_object()\r
- {\r
- thread.join();\r
- }\r
-\r
- void run()\r
- {\r
- future_object_helper<R>::locking_invoke(*this, f);\r
- }\r
-};\r
-\r
-}\r
- \r
-template<typename F>\r
-auto async(launch policy, F&& f) -> boost::unique_future<decltype(f())>\r
-{ \r
- typedef decltype(f()) result_type; \r
- typedef boost::detail::future_object<result_type> future_object_type;\r
-\r
- boost::shared_ptr<future_object_type> future_object;\r
-\r
- if((policy & launch::async) != 0)\r
- future_object = boost::static_pointer_cast<future_object_type>(boost::make_shared<detail::async_future_object<result_type, F>>(std::forward<F>(f)));\r
- else if((policy & launch::deferred) != 0)\r
- future_object = boost::static_pointer_cast<future_object_type>(boost::make_shared<detail::deferred_future_object<result_type, F>>(std::forward<F>(f))); \r
- else\r
- throw std::invalid_argument("policy");\r
- \r
- boost::unique_future<result_type> future;\r
-\r
- static_assert(sizeof(future) == sizeof(future_object), "");\r
-\r
- reinterpret_cast<boost::shared_ptr<future_object_type>&>(future) = std::move(future_object); // Get around the "private" encapsulation.\r
- return std::move(future);\r
-}\r
- \r
-template<typename F>\r
-auto async(F&& f) -> boost::unique_future<decltype(f())>\r
-{ \r
- return async(launch::async | launch::deferred, std::forward<F>(f));\r
-}\r
-\r
-template<typename T>\r
-auto make_shared(boost::unique_future<T>&& f) -> boost::shared_future<T>\r
-{ \r
- return boost::shared_future<T>(std::move(f));\r
-}\r
-\r
-template<typename T>\r
-auto flatten(boost::unique_future<T>&& f) -> boost::unique_future<decltype(f.get().get())>\r
-{\r
- auto shared_f = make_shared(std::move(f));\r
- return async(launch::deferred, [=]() mutable\r
- {\r
- return shared_f.get().get();\r
- });\r
-}\r
-\r
+#pragma once
+
+#include <boost/thread/mutex.hpp>
+#include <boost/function.hpp>
+#include <boost/optional.hpp>
+
+#include <functional>
+#include <future>
+
+namespace caspar {
+
+template<typename T>
+auto flatten(std::future<T>&& f) -> std::future<typename std::decay<decltype(f.get().get())>::type>
+{
+ auto shared_f = f.share();
+ return std::async(std::launch::deferred, [=]() mutable -> typename std::decay<decltype(f.get().get())>::type
+ {
+ return shared_f.get().get();
+ });
+}
+
+template<typename F>
+bool is_ready(const F& future)
+{
+ return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
+}
+
+/**
+ * A utility that helps the producer side of a future when the task is not
+ * able to complete immediately but there are known retry points in the code.
+ */
+template<class R>
+class retry_task
+{
+public:
+ typedef boost::function<boost::optional<R> ()> func_type;
+
+ retry_task() : done_(false) {}
+
+ /**
+ * Reset the state with a new task. If the previous task has not completed
+ * the old one will be discarded.
+ *
+ * @param func The function that tries to calculate future result. If the
+ * optional return value is set the future is marked as ready.
+ */
+ void set_task(const func_type& func)
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+
+ func_ = func;
+ done_ = false;
+ promise_ = std::promise<R>();
+ }
+
+ /**
+ * Take ownership of the future for the current task. Cannot only be called
+ * once for each task.
+ *
+ * @return the future.
+ */
+ std::future<R> get_future()
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+
+ return promise_.get_future();
+ }
+
+ /**
+ * Call this when it is guaranteed or probable that the task will be able
+ * to complete.
+ *
+ * @return true if the task completed (the future will have a result).
+ */
+ bool try_completion()
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+
+ return try_completion_internal();
+ }
+
+ /**
+ * Call this when it is certain that the result should be ready, and if not
+ * it should be regarded as an unrecoverable error (retrying again would
+ * be useless), so the future will be marked as failed.
+ *
+ * @param exception The exception to mark the future with *if* the task
+ * completion fails.
+ */
+ template <class E>
+ void try_or_fail(const E& exception)
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+
+ if (!try_completion_internal())
+ {
+ try
+ {
+ throw exception;
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ promise_.set_exception(std::current_exception());
+ done_ = true;
+ }
+ }
+ }
+private:
+ bool try_completion_internal()
+ {
+ if (!func_)
+ return false;
+
+ if (done_)
+ return true;
+
+ boost::optional<R> result;
+
+ try
+ {
+ result = func_();
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ promise_.set_exception(std::current_exception());
+ done_ = true;
+
+ return true;
+ }
+
+ if (result)
+ {
+ promise_.set_value(*result);
+ done_ = true;
+ }
+
+ return done_;
+ }
+private:
+ boost::mutex mutex_;
+ func_type func_;
+ std::promise<R> promise_;
+ bool done_;
+};
+
+/**
+ * Wrap a value in a future with an already known result.
+ * <p>
+ * Useful when the result of an operation is already known at the time of
+ * calling.
+ *
+ * @param value The r-value to wrap.
+ *
+ * @return The future with the result set.
+ */
+template<class R>
+std::future<R> make_ready_future(R&& value)
+{
+ std::promise<R> p;
+
+ p.set_value(value);
+
+ return p.get_future();
+}
+
+static std::future<void> make_ready_future()
+{
+ std::promise<void> p;
+
+ p.set_value();
+
+ return p.get_future();
+}
+
}
\ No newline at end of file