return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
-/**
- * A utility that helps the producer side of a future when the task is not
- * able to complete immediately but there are known retry points in the code.
- */
-template<class R>
-class retry_task
-{
-public:
- typedef boost::function<boost::optional<R> ()> func_type;
-
- retry_task() : done_(false) {}
-
- /**
- * Reset the state with a new task. If the previous task has not completed
- * the old one will be discarded.
- *
- * @param func The function that tries to calculate future result. If the
- * optional return value is set the future is marked as ready.
- */
- void set_task(const func_type& func)
- {
- boost::unique_lock<boost::mutex> lock(mutex_);
-
- func_ = func;
- done_ = false;
- promise_ = std::promise<R>();
- }
-
- /**
- * Take ownership of the future for the current task. Cannot only be called
- * once for each task.
- *
- * @return the future.
- */
- std::future<R> get_future()
- {
- boost::unique_lock<boost::mutex> lock(mutex_);
-
- return promise_.get_future();
- }
-
- /**
- * Call this when it is guaranteed or probable that the task will be able
- * to complete.
- *
- * @return true if the task completed (the future will have a result).
- */
- bool try_completion()
- {
- boost::unique_lock<boost::mutex> lock(mutex_);
-
- return try_completion_internal();
- }
-
- /**
- * Call this when it is certain that the result should be ready, and if not
- * it should be regarded as an unrecoverable error (retrying again would
- * be useless), so the future will be marked as failed.
- *
- * @param exception The exception to mark the future with *if* the task
- * completion fails.
- */
- template <class E>
- void try_or_fail(const E& exception)
- {
- boost::unique_lock<boost::mutex> lock(mutex_);
-
- if (!try_completion_internal())
- {
- try
- {
- throw exception;
- }
- catch (...)
- {
- CASPAR_LOG_CURRENT_EXCEPTION();
- promise_.set_exception(std::current_exception());
- done_ = true;
- }
- }
- }
-private:
- bool try_completion_internal()
- {
- if (!func_)
- return false;
-
- if (done_)
- return true;
-
- boost::optional<R> result;
-
- try
- {
- result = func_();
- }
- catch (...)
- {
- CASPAR_LOG_CURRENT_EXCEPTION();
- promise_.set_exception(std::current_exception());
- done_ = true;
-
- return true;
- }
-
- if (result)
- {
- promise_.set_value(*result);
- done_ = true;
- }
-
- return done_;
- }
-private:
- boost::mutex mutex_;
- func_type func_;
- std::promise<R> promise_;
- bool done_;
-};
-
/**
* Wrap a value in a future with an already known result.
* <p>
#include <boost/circular_buffer.hpp>
#include <boost/property_tree/ptree.hpp>
+#include <future>
+
namespace caspar { namespace decklink {
struct configuration
spl::shared_ptr<diagnostics::graph> graph_;
caspar::timer tick_timer_;
- retry_task<bool> send_completion_;
+ std::packaged_task<bool ()> send_completion_;
reference_signal_detector reference_signal_detector_ { output_ };
tbb::atomic<int64_t> current_presentation_delay_;
tbb::atomic<int64_t> scheduled_frames_completed_;
frame_buffer_.pop(frame);
- send_completion_.try_completion();
+ if (send_completion_.valid())
+ send_completion_();
if (config_.embedded_audio)
schedule_next_audio(channel_remapper_.mix_and_rearrange(frame.audio_data()));
if(!is_running_)
CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(print() + L" Is not running."));
- bool ready = false;
+ if (frame_buffer_.try_push(frame))
+ return make_ready_future(true);
- auto enqueue_task = [ready, frame, this]() mutable -> boost::optional<bool>
+ send_completion_ = std::packaged_task<bool ()>([frame, this] () mutable -> bool
{
- if (!ready)
- ready = frame_buffer_.try_push(frame);
-
- if (ready)
- return true;
- else
- return boost::optional<bool>();
- };
+ frame_buffer_.push(frame);
- send_completion_.set_task(enqueue_task);
- send_completion_.try_completion();
+ return true;
+ });
return send_completion_.get_future();
}