From: hellgore Date: Mon, 10 Sep 2012 16:05:18 +0000 (+0000) Subject: Merged asynchronous invocation of consumers from 2.0 X-Git-Tag: 2.1.0_Beta1~498 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=52c5223f43ad44f8990608a65778e65b021aa4ff;p=casparcg Merged asynchronous invocation of consumers from 2.0 git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@3280 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d --- diff --git a/common/future.h b/common/future.h index 7789ab871..21297180d 100644 --- a/common/future.h +++ b/common/future.h @@ -188,4 +188,136 @@ auto flatten(boost::unique_future&& f) -> boost::unique_future +class retry_task +{ +public: + typedef boost::function ()> func_type; + + retry_task() : done_(false) {} + + /** + * Reset the state with a new task. If the previous task has not completed + * the old one will be discarded. + * + * @param func The function that tries to calculate future result. If the + * optional return value is set the future is marked as ready. + */ + void set_task(const func_type& func) + { + boost::mutex::scoped_lock lock(mutex_); + + func_ = func; + done_ = false; + promise_ = boost::promise(); + } + + /** + * Take ownership of the future for the current task. Cannot only be called + * once for each task. + * + * @return the future. + */ + boost::unique_future get_future() + { + boost::mutex::scoped_lock lock(mutex_); + + return promise_.get_future(); + } + + /** + * Call this when it is guaranteed or probable that the task will be able + * to complete. + * + * @return true if the task completed (the future will have a result). + */ + bool try_completion() + { + boost::mutex::scoped_lock lock(mutex_); + + if (!func_) + return false; + + if (done_) + return true; + + boost::optional result; + + try + { + result = func_(); + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + promise_.set_exception(boost::current_exception()); + done_ = true; + + return true; + } + + if (result) + { + promise_.set_value(*result); + done_ = true; + } + + return done_; + } + + /** + * Call this when it is certain that the result should be ready, and if not + * it should be regarded as an unrecoverable error (retrying again would + * be useless), so the future will be marked as failed. + * + * @param exception The exception to mark the future with *if* the task + * completion fails. + */ + void try_or_fail(const std::exception& exception) + { + if (!try_completion()) + { + try + { + throw exception; + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + promise_.set_exception(boost::current_exception()); + done_ = true; + } + } + } +private: + boost::mutex mutex_; + func_type func_; + boost::promise promise_; + bool done_; +}; + +/** + * Wrap a value in a future with an already known result. + *

+ * Useful when the result of an operation is already known at the time of + * calling. + * + * @param value The r-value to wrap. + * + * @return The future with the result set. + */ +template +boost::unique_future wrap_as_future(R&& value) +{ + boost::promise p; + + p.set_value(value); + + return p.get_future(); +} + } \ No newline at end of file diff --git a/core/consumer/frame_consumer.cpp b/core/consumer/frame_consumer.cpp index 72aa40730..b7da6ce5e 100644 --- a/core/consumer/frame_consumer.cpp +++ b/core/consumer/frame_consumer.cpp @@ -24,6 +24,7 @@ #include "frame_consumer.h" #include +#include #include #include @@ -76,7 +77,7 @@ public: }).detach(); } - bool send(const_frame frame) override {return consumer_->send(std::move(frame));} + boost::unique_future send(const_frame frame) override {return consumer_->send(std::move(frame));} virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override {return consumer_->initialize(format_desc, channel_index);} std::wstring print() const override {return consumer_->print();} std::wstring name() const override {return consumer_->name();} @@ -106,7 +107,7 @@ public: CASPAR_LOG(info) << str << L" Uninitialized."; } - bool send(const_frame frame) override {return consumer_->send(std::move(frame));} + boost::unique_future send(const_frame frame) override {return consumer_->send(std::move(frame));} virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override {return consumer_->initialize(format_desc, channel_index);} std::wstring print() const override {return consumer_->print();} std::wstring name() const override {return consumer_->name();} @@ -129,7 +130,7 @@ public: { } - virtual bool send(const_frame frame) + virtual boost::unique_future send(const_frame frame) { try { @@ -147,7 +148,7 @@ public: { CASPAR_LOG_CURRENT_EXCEPTION(); CASPAR_LOG(error) << print() << " Failed to recover consumer."; - return false; + return wrap_as_future(false); } } } @@ -188,12 +189,12 @@ public: consumer_->initialize(format_desc, channel_index); } - bool send(const_frame frame) override + boost::unique_future send(const_frame frame) override { if(audio_cadence_.size() == 1) return consumer_->send(frame); - bool result = true; + boost::unique_future result = wrap_as_future(true); if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast(frame.audio_data().size())) { @@ -206,7 +207,7 @@ public: sync_buffer_.push_back(static_cast(frame.audio_data().size())); - return result; + return std::move(result); } std::wstring print() const override {return consumer_->print();} @@ -253,7 +254,7 @@ const spl::shared_ptr& frame_consumer::empty() class empty_frame_consumer : public frame_consumer { public: - bool send(const_frame) override {return false;} + boost::unique_future send(const_frame) override {return wrap_as_future(false);} void initialize(const video_format_desc&, int) override{} std::wstring print() const override {return L"empty";} std::wstring name() const override {return L"empty";} diff --git a/core/consumer/frame_consumer.h b/core/consumer/frame_consumer.h index 9fe654343..9288b2a48 100644 --- a/core/consumer/frame_consumer.h +++ b/core/consumer/frame_consumer.h @@ -26,13 +26,14 @@ #include #include +#include #include #include #include namespace caspar { namespace core { - + // Interface class frame_consumer : public monitor::observable { @@ -51,7 +52,7 @@ public: // Methods - virtual bool send(class const_frame frame) = 0; + virtual boost::unique_future send(class const_frame frame) = 0; virtual void initialize(const struct video_format_desc& format_desc, int channel_index) = 0; // monitor::observable diff --git a/core/consumer/output.cpp b/core/consumer/output.cpp index 43aa68f8f..f6aaf6d96 100644 --- a/core/consumer/output.cpp +++ b/core/consumer/output.cpp @@ -177,22 +177,37 @@ public: if(!frames_.full()) return; - for(auto it = ports_.begin(); it != ports_.end();) + std::map> send_results; + + // Start invocations + for (auto it = ports_.begin(); it != ports_.end(); ++it) { auto& port = it->second; auto& frame = frames_.at(port.buffer_depth()-minmax.first); try { - if(port.send(frame)) - ++it; - else - ports_.erase(it++); + send_results.insert(std::make_pair(it->first, port.send(frame))); } - catch(...) + catch (...) { CASPAR_LOG_CURRENT_EXCEPTION(); - ports_.erase(it++); + ports_.erase(it); + } + } + + // Retrieve results + for (auto it = send_results.begin(); it != send_results.end(); ++it) + { + try + { + if (!it->second.get()) + ports_.erase(it->first); + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + ports_.erase(it->first); } } }); diff --git a/core/consumer/port.cpp b/core/consumer/port.cpp index ca2b063dd..652690cb4 100644 --- a/core/consumer/port.cpp +++ b/core/consumer/port.cpp @@ -28,7 +28,7 @@ public: consumer_->initialize(format_desc, channel_index_); } - bool send(const_frame frame) + boost::unique_future send(const_frame frame) { event_subject_ << monitor::event("type") % consumer_->name(); return consumer_->send(std::move(frame)); @@ -59,7 +59,7 @@ port::port(int index, int channel_index, spl::shared_ptr consume port::port(port&& other) : impl_(std::move(other.impl_)){} port::~port(){} port& port::operator=(port&& other){impl_ = std::move(other.impl_); return *this;} -bool port::send(const_frame frame){return impl_->send(std::move(frame));} +boost::unique_future port::send(const_frame frame){return impl_->send(std::move(frame));} void port::subscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.subscribe(o);} void port::unsubscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.unsubscribe(o);} void port::video_format_desc(const struct video_format_desc& format_desc){impl_->video_format_desc(format_desc);} diff --git a/core/consumer/port.h b/core/consumer/port.h index 76ff625df..de21c520c 100644 --- a/core/consumer/port.h +++ b/core/consumer/port.h @@ -5,6 +5,7 @@ #include #include +#include namespace caspar { namespace core { @@ -26,7 +27,7 @@ public: port& operator=(port&& other); - bool send(class const_frame frame); + boost::unique_future send(class const_frame frame); // monitor::observable diff --git a/modules/bluefish/consumer/bluefish_consumer.cpp b/modules/bluefish/consumer/bluefish_consumer.cpp index 9a2c55885..cbe48d104 100644 --- a/modules/bluefish/consumer/bluefish_consumer.cpp +++ b/modules/bluefish/consumer/bluefish_consumer.cpp @@ -186,9 +186,9 @@ public: CASPAR_LOG(error)<< print() << TEXT(" Failed to disable video output."); } - void send(core::const_frame& frame) + boost::unique_future send(core::const_frame& frame) { - executor_.begin_invoke([=] + return executor_.begin_invoke([=]() -> bool { try { @@ -200,6 +200,8 @@ public: { CASPAR_LOG_CURRENT_EXCEPTION(); } + + return true; }); } @@ -317,10 +319,9 @@ public: consumer_.reset(new bluefish_consumer(format_desc, device_index_, embedded_audio_, key_only_, channel_index)); } - bool send(core::const_frame frame) override + boost::unique_future send(core::const_frame frame) override { - consumer_->send(frame); - return true; + return consumer_->send(frame); } std::wstring print() const override diff --git a/modules/decklink/consumer/decklink_consumer.cpp b/modules/decklink/consumer/decklink_consumer.cpp index 73a39432a..97043a03f 100644 --- a/modules/decklink/consumer/decklink_consumer.cpp +++ b/modules/decklink/consumer/decklink_consumer.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include @@ -206,6 +207,7 @@ struct decklink_consumer : public IDeckLinkVideoOutputCallback, public IDeckLink spl::shared_ptr graph_; boost::timer tick_timer_; + retry_task send_completion_; public: decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index) @@ -374,7 +376,8 @@ public: graph_->set_tag("flushed-frame"); auto frame = core::const_frame::empty(); - video_frame_buffer_.pop(frame); + video_frame_buffer_.pop(frame); + send_completion_.try_completion(); schedule_next_video(frame); unsigned long buffered; @@ -414,6 +417,7 @@ public: { auto frame = core::const_frame::empty(); audio_frame_buffer_.pop(frame); + send_completion_.try_completion(); schedule_next_audio(frame.audio_data()); } @@ -456,7 +460,7 @@ public: tick_timer_.restart(); } - void send(core::const_frame frame) + boost::unique_future send(core::const_frame frame) { auto exception = lock(exception_mutex_, [&] { @@ -469,9 +473,29 @@ public: if(!is_running_) CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(u8(print()) + " Is not running.")); - if(config_.embedded_audio) - audio_frame_buffer_.push(frame); - video_frame_buffer_.push(frame); + bool audio_ready = !config_.embedded_audio; + bool video_ready = false; + + auto enqueue_task = [audio_ready, video_ready, frame, this]() mutable -> boost::optional + { + if (!audio_ready) + audio_ready = audio_frame_buffer_.try_push(frame); + + if (!video_ready) + video_ready = video_frame_buffer_.try_push(frame); + + if (audio_ready && video_ready) + return true; + else + return boost::optional(); + }; + + if (enqueue_task()) + return wrap_as_future(true); + + send_completion_.set_task(enqueue_task); + + return send_completion_.get_future(); } std::wstring print() const @@ -517,10 +541,9 @@ public: }); } - bool send(core::const_frame frame) override + boost::unique_future send(core::const_frame frame) override { - consumer_->send(frame); - return true; + return consumer_->send(frame); } std::wstring print() const override diff --git a/modules/ffmpeg/consumer/ffmpeg_consumer.cpp b/modules/ffmpeg/consumer/ffmpeg_consumer.cpp index eff99a64a..243c0bf9e 100644 --- a/modules/ffmpeg/consumer/ffmpeg_consumer.cpp +++ b/modules/ffmpeg/consumer/ffmpeg_consumer.cpp @@ -347,7 +347,7 @@ public: // frame_consumer - bool send(core::const_frame& frame) + boost::unique_future send(core::const_frame& frame) { auto exception = lock(exception_mutex_, [&] { @@ -357,12 +357,12 @@ public: if(exception != nullptr) std::rethrow_exception(exception); - executor_.begin_invoke([=] + return executor_.begin_invoke([=]() -> bool { encode(frame); + + return true; }); - - return true; } std::wstring print() const @@ -744,7 +744,7 @@ public: consumer_.reset(new ffmpeg_consumer(u8(filename_), format_desc, options_)); } - bool send(core::const_frame frame) override + boost::unique_future send(core::const_frame frame) override { return consumer_->send(frame); } diff --git a/modules/image/consumer/image_consumer.cpp b/modules/image/consumer/image_consumer.cpp index b5b3767df..579ec876d 100644 --- a/modules/image/consumer/image_consumer.cpp +++ b/modules/image/consumer/image_consumer.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -54,7 +55,7 @@ public: { } - bool send(core::const_frame frame) override + boost::unique_future send(core::const_frame frame) override { boost::thread async([frame] { @@ -74,7 +75,7 @@ public: }); async.detach(); - return false; + return wrap_as_future(false); } std::wstring print() const override diff --git a/modules/oal/consumer/oal_consumer.cpp b/modules/oal/consumer/oal_consumer.cpp index e65ef37ca..e0084ee3e 100644 --- a/modules/oal/consumer/oal_consumer.cpp +++ b/modules/oal/consumer/oal_consumer.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -174,8 +175,10 @@ public: }); } - bool send(core::const_frame frame) override - { + boost::unique_future send(core::const_frame frame) override + { + // Will only block if the default executor queue capacity of 512 is + // exhausted, which should not happen executor_.begin_invoke([=] { ALenum state; @@ -213,7 +216,7 @@ public: perf_timer_.restart(); }); - return true; + return wrap_as_future(true); } std::wstring print() const override diff --git a/modules/screen/consumer/screen_consumer.cpp b/modules/screen/consumer/screen_consumer.cpp index 45b5b8a30..a14ed2d12 100644 --- a/modules/screen/consumer/screen_consumer.cpp +++ b/modules/screen/consumer/screen_consumer.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include @@ -443,11 +444,12 @@ public: } - bool send(core::const_frame frame) + boost::unique_future send(core::const_frame frame) { if(!frame_buffer_.try_push(frame)) graph_->set_tag("dropped-frame"); - return is_running_; + + return wrap_as_future(is_running_.load()); } std::wstring print() const @@ -531,7 +533,7 @@ public: consumer_.reset(new screen_consumer(config_, format_desc, channel_index)); } - bool send(core::const_frame frame) override + boost::unique_future send(core::const_frame frame) override { return consumer_->send(frame); }