X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=common%2Ffuture.h;h=12c0bfc603dacd78d2810fad32721b4b6622a6ef;hb=bb04ef30e802fa58550a0dec7b54e61aeefb0c7e;hp=7ee9da4fd31b8e2ce6fde8e560f2a67cb078f33f;hpb=58dbff2c53cb764095a301131416080afcbcfbf9;p=casparcg diff --git a/common/future.h b/common/future.h index 7ee9da4fd..12c0bfc60 100644 --- a/common/future.h +++ b/common/future.h @@ -1,185 +1,177 @@ -#pragma once - -#include "enum_class.h" - -#include -#include -#include - -#include - -namespace caspar { - -struct launch_policy_def -{ - enum type - { - async = 1, - deferred = 2 - }; -}; -typedef caspar::enum_class launch; - -namespace detail { - -template -struct future_object_helper -{ - template - static void nonlocking_invoke(T& future_object, F& f) - { - try - { - future_object.mark_finished_with_result_internal(f()); - } - catch(...) - { - future_object.mark_exceptional_finish_internal(boost::current_exception()); - } - } - - template - static void locking_invoke(T& future_object, F& f) - { - try - { - future_object.mark_finished_with_result(f()); - } - catch(...) - { - future_object.mark_exceptional_finish(); - } - } -}; - -template<> -struct future_object_helper -{ - template - static void nonlocking_invoke(T& future_object, F& f) - { - try - { - f(); - future_object.mark_finished_with_result_internal(); - } - catch(...) - { - future_object.mark_exceptional_finish_internal(boost::current_exception()); - } - } - - template - static void locking_invoke(T& future_object, F& f) - { - try - { - f(); - future_object.mark_finished_with_result(); - } - catch(...) - { - future_object.mark_exceptional_finish(); - } - } -}; - -template -struct deferred_future_object : public boost::detail::future_object -{ - F f; - bool done; - - template - deferred_future_object(F2&& f) - : f(std::forward(f)) - , done(false) - { - set_wait_callback(std::mem_fn(&detail::deferred_future_object::operator()), this); - } - - void operator()() - { - boost::lock_guard lock2(mutex); - - if(done) - return; - - future_object_helper::nonlocking_invoke(*this, f); - - done = true; - } -}; - -template -struct async_future_object : public boost::detail::future_object -{ - F f; - boost::thread thread; - - template - async_future_object(F2&& f) - : f(std::forward(f)) - , thread([this]{run();}) - { - } - - ~async_future_object() - { - thread.join(); - } - - void run() - { - future_object_helper::locking_invoke(*this, f); - } -}; - -} - -template -auto async(launch policy, F&& f) -> boost::unique_future -{ - typedef decltype(f()) result_type; - typedef boost::detail::future_object future_object_type; - - boost::shared_ptr future_object; - - if((policy & launch::async) != 0) - future_object = boost::static_pointer_cast(boost::make_shared>(std::forward(f))); - else if((policy & launch::deferred) != 0) - future_object = boost::static_pointer_cast(boost::make_shared>(std::forward(f))); - else - throw std::invalid_argument("policy"); - - boost::unique_future future; - - static_assert(sizeof(future) == sizeof(future_object), ""); - - reinterpret_cast&>(future) = std::move(future_object); // Get around the "private" encapsulation. - return std::move(future); -} - -template -auto async(F&& f) -> boost::unique_future -{ - return async(launch::async | launch::deferred, std::forward(f)); -} - -template -auto make_shared(boost::unique_future&& f) -> boost::shared_future -{ - return boost::shared_future(std::move(f)); -} - -template -auto flatten(boost::unique_future&& f) -> boost::unique_future -{ - auto shared_f = make_shared(std::move(f)); - return async(launch::deferred, [=]() mutable - { - return shared_f.get().get(); - }); -} - +#pragma once + +#include +#include +#include + +#include +#include + +namespace caspar { + +template +auto flatten(std::future&& f) -> std::future::type> +{ + auto shared_f = f.share(); + return std::async(std::launch::deferred, [=]() mutable -> typename std::decay::type + { + return shared_f.get().get(); + }); +} + +template +bool is_ready(const F& future) +{ + return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready; +} + +/** + * A utility that helps the producer side of a future when the task is not + * able to complete immediately but there are known retry points in the code. + */ +template +class retry_task +{ +public: + typedef boost::function ()> func_type; + + retry_task() : done_(false) {} + + /** + * Reset the state with a new task. If the previous task has not completed + * the old one will be discarded. + * + * @param func The function that tries to calculate future result. If the + * optional return value is set the future is marked as ready. + */ + void set_task(const func_type& func) + { + boost::unique_lock lock(mutex_); + + func_ = func; + done_ = false; + promise_ = std::promise(); + } + + /** + * Take ownership of the future for the current task. Cannot only be called + * once for each task. + * + * @return the future. + */ + std::future get_future() + { + boost::unique_lock lock(mutex_); + + return promise_.get_future(); + } + + /** + * Call this when it is guaranteed or probable that the task will be able + * to complete. + * + * @return true if the task completed (the future will have a result). + */ + bool try_completion() + { + boost::unique_lock lock(mutex_); + + return try_completion_internal(); + } + + /** + * Call this when it is certain that the result should be ready, and if not + * it should be regarded as an unrecoverable error (retrying again would + * be useless), so the future will be marked as failed. + * + * @param exception The exception to mark the future with *if* the task + * completion fails. + */ + template + void try_or_fail(const E& exception) + { + boost::unique_lock lock(mutex_); + + if (!try_completion_internal()) + { + try + { + throw exception; + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + promise_.set_exception(std::current_exception()); + done_ = true; + } + } + } +private: + bool try_completion_internal() + { + if (!func_) + return false; + + if (done_) + return true; + + boost::optional result; + + try + { + result = func_(); + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + promise_.set_exception(std::current_exception()); + done_ = true; + + return true; + } + + if (result) + { + promise_.set_value(*result); + done_ = true; + } + + return done_; + } +private: + boost::mutex mutex_; + func_type func_; + std::promise promise_; + bool done_; +}; + +/** + * Wrap a value in a future with an already known result. + *

+ * Useful when the result of an operation is already known at the time of + * calling. + * + * @param value The r-value to wrap. + * + * @return The future with the result set. + */ +template +std::future make_ready_future(R&& value) +{ + std::promise p; + + p.set_value(value); + + return p.get_future(); +} + +static std::future make_ready_future() +{ + std::promise p; + + p.set_value(); + + return p.get_future(); +} + } \ No newline at end of file