From f529ebe9cc9c0a58a6ba4a1ebcd377b4dabf830a Mon Sep 17 00:00:00 2001 From: Helge Norberg Date: Mon, 11 Jul 2016 21:11:20 +0200 Subject: [PATCH] No more use for retry_task in decklink_consumer now since audio and video are completed in the same callback. std::packaged_task is simpler to use now. --- common/future.h | 120 ------------------ .../decklink/consumer/decklink_consumer.cpp | 25 ++-- 2 files changed, 11 insertions(+), 134 deletions(-) diff --git a/common/future.h b/common/future.h index 12c0bfc60..65bea20b5 100644 --- a/common/future.h +++ b/common/future.h @@ -25,126 +25,6 @@ bool is_ready(const F& future) return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } -/** - * A utility that helps the producer side of a future when the task is not - * able to complete immediately but there are known retry points in the code. - */ -template -class retry_task -{ -public: - typedef boost::function ()> func_type; - - retry_task() : done_(false) {} - - /** - * Reset the state with a new task. If the previous task has not completed - * the old one will be discarded. - * - * @param func The function that tries to calculate future result. If the - * optional return value is set the future is marked as ready. - */ - void set_task(const func_type& func) - { - boost::unique_lock lock(mutex_); - - func_ = func; - done_ = false; - promise_ = std::promise(); - } - - /** - * Take ownership of the future for the current task. Cannot only be called - * once for each task. - * - * @return the future. - */ - std::future get_future() - { - boost::unique_lock lock(mutex_); - - return promise_.get_future(); - } - - /** - * Call this when it is guaranteed or probable that the task will be able - * to complete. - * - * @return true if the task completed (the future will have a result). - */ - bool try_completion() - { - boost::unique_lock lock(mutex_); - - return try_completion_internal(); - } - - /** - * Call this when it is certain that the result should be ready, and if not - * it should be regarded as an unrecoverable error (retrying again would - * be useless), so the future will be marked as failed. - * - * @param exception The exception to mark the future with *if* the task - * completion fails. - */ - template - void try_or_fail(const E& exception) - { - boost::unique_lock lock(mutex_); - - if (!try_completion_internal()) - { - try - { - throw exception; - } - catch (...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - promise_.set_exception(std::current_exception()); - done_ = true; - } - } - } -private: - bool try_completion_internal() - { - if (!func_) - return false; - - if (done_) - return true; - - boost::optional result; - - try - { - result = func_(); - } - catch (...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - promise_.set_exception(std::current_exception()); - done_ = true; - - return true; - } - - if (result) - { - promise_.set_value(*result); - done_ = true; - } - - return done_; - } -private: - boost::mutex mutex_; - func_type func_; - std::promise promise_; - bool done_; -}; - /** * Wrap a value in a future with an already known result. *

diff --git a/modules/decklink/consumer/decklink_consumer.cpp b/modules/decklink/consumer/decklink_consumer.cpp index ca44c82c9..37dfeea1b 100644 --- a/modules/decklink/consumer/decklink_consumer.cpp +++ b/modules/decklink/consumer/decklink_consumer.cpp @@ -57,6 +57,8 @@ #include #include +#include + namespace caspar { namespace decklink { struct configuration @@ -390,7 +392,7 @@ struct decklink_consumer : public IDeckLinkVideoOutputCallback, boost::noncopyab spl::shared_ptr graph_; caspar::timer tick_timer_; - retry_task send_completion_; + std::packaged_task send_completion_; reference_signal_detector reference_signal_detector_ { output_ }; tbb::atomic current_presentation_delay_; tbb::atomic scheduled_frames_completed_; @@ -566,7 +568,8 @@ public: frame_buffer_.pop(frame); - send_completion_.try_completion(); + if (send_completion_.valid()) + send_completion_(); if (config_.embedded_audio) schedule_next_audio(channel_remapper_.mix_and_rearrange(frame.audio_data())); @@ -627,21 +630,15 @@ public: if(!is_running_) CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(print() + L" Is not running.")); - bool ready = false; + if (frame_buffer_.try_push(frame)) + return make_ready_future(true); - auto enqueue_task = [ready, frame, this]() mutable -> boost::optional + send_completion_ = std::packaged_task([frame, this] () mutable -> bool { - if (!ready) - ready = frame_buffer_.try_push(frame); - - if (ready) - return true; - else - return boost::optional(); - }; + frame_buffer_.push(frame); - send_completion_.set_task(enqueue_task); - send_completion_.try_completion(); + return true; + }); return send_completion_.get_future(); } -- 2.39.2