]> git.sesse.net Git - casparcg/commitdiff
Merged asynchronous invocation of consumers from 2.0
authorhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 10 Sep 2012 16:05:18 +0000 (16:05 +0000)
committerhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 10 Sep 2012 16:05:18 +0000 (16:05 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@3280 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

12 files changed:
common/future.h
core/consumer/frame_consumer.cpp
core/consumer/frame_consumer.h
core/consumer/output.cpp
core/consumer/port.cpp
core/consumer/port.h
modules/bluefish/consumer/bluefish_consumer.cpp
modules/decklink/consumer/decklink_consumer.cpp
modules/ffmpeg/consumer/ffmpeg_consumer.cpp
modules/image/consumer/image_consumer.cpp
modules/oal/consumer/oal_consumer.cpp
modules/screen/consumer/screen_consumer.cpp

index 7789ab871a6f388e1cd889b96ee25ff00b9ce3cf..21297180d59629bf3181cd461b79a0c57a734a05 100644 (file)
@@ -188,4 +188,136 @@ auto flatten(boost::unique_future<T>&& f) -> boost::unique_future<decltype(f.get
        });
 }
 
+/**
+ * A utility that helps the producer side of a future when the task is not
+ * able to complete immediately but there are known retry points in the code.
+ */
+template<class R>
+class retry_task
+{
+public:
+       typedef boost::function<boost::optional<R> ()> func_type;
+       
+       retry_task() : done_(false) {}
+
+       /**
+        * Reset the state with a new task. If the previous task has not completed
+        * the old one will be discarded.
+        *
+        * @param func The function that tries to calculate future result. If the
+        *             optional return value is set the future is marked as ready.
+        */
+       void set_task(const func_type& func)
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               func_ = func;
+               done_ = false;
+               promise_ = boost::promise<R>();
+       }
+
+       /**
+        * Take ownership of the future for the current task. Cannot only be called
+        * once for each task.
+        *
+        * @return the future.
+        */
+       boost::unique_future<R> get_future()
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               return promise_.get_future();
+       }
+
+       /**
+        * Call this when it is guaranteed or probable that the task will be able
+        * to complete.
+        *
+        * @return true if the task completed (the future will have a result).
+        */
+       bool try_completion()
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               if (!func_)
+                       return false;
+
+               if (done_)
+                       return true;
+
+               boost::optional<R> result;
+
+               try
+               {
+                       result = func_();
+               }
+               catch (...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+                       promise_.set_exception(boost::current_exception());
+                       done_ = true;
+
+                       return true;
+               }
+
+               if (result)
+               {
+                       promise_.set_value(*result);
+                       done_ = true;
+               }
+
+               return done_;
+       }
+
+       /**
+        * Call this when it is certain that the result should be ready, and if not
+        * it should be regarded as an unrecoverable error (retrying again would
+        * be useless), so the future will be marked as failed.
+        *
+        * @param exception The exception to mark the future with *if* the task
+        *                  completion fails.
+        */
+       void try_or_fail(const std::exception& exception)
+       {
+               if (!try_completion())
+               {
+                       try
+                       {
+                               throw exception;
+                       }
+                       catch (...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                               promise_.set_exception(boost::current_exception());
+                               done_ = true;
+                       }
+               }
+       }
+private:
+       boost::mutex mutex_;
+       func_type func_;
+       boost::promise<R> promise_;
+       bool done_;
+};
+
+/**
+ * Wrap a value in a future with an already known result.
+ * <p>
+ * Useful when the result of an operation is already known at the time of
+ * calling.
+ *
+ * @param value The r-value to wrap.
+ *
+ * @return The future with the result set.
+ */
+template<class R>
+boost::unique_future<R> wrap_as_future(R&& value)
+{
+       boost::promise<R> p;
+
+       p.set_value(value);
+
+       return p.get_future();
+}
+
 }
\ No newline at end of file
index 72aa407303a11cf2d26e4edfd4de17095ad1018b..b7da6ce5e2eca5cc14ff069bf842e5968b89aa9a 100644 (file)
@@ -24,6 +24,7 @@
 #include "frame_consumer.h"
 
 #include <common/except.h>
+#include <common/future.h>
 
 #include <core/video_format.h>
 #include <core/frame/frame.h>
@@ -76,7 +77,7 @@ public:
                }).detach(); 
        }
        
-       bool send(const_frame frame) override                                                                                                                           {return consumer_->send(std::move(frame));}
+       boost::unique_future<bool> send(const_frame frame) override                                                                                     {return consumer_->send(std::move(frame));}
        virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}
        std::wstring print() const override                                                                                                                                     {return consumer_->print();}    
        std::wstring name() const override                                                                                                                                      {return consumer_->name();}
@@ -106,7 +107,7 @@ public:
                CASPAR_LOG(info) << str << L" Uninitialized.";
        }
        
-       bool send(const_frame frame) override                                                                                                   {return consumer_->send(std::move(frame));}
+       boost::unique_future<bool> send(const_frame frame) override                                                                     {return consumer_->send(std::move(frame));}
        virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override        {return consumer_->initialize(format_desc, channel_index);}
        std::wstring print() const override                                                                                                                     {return consumer_->print();}
        std::wstring name() const override                                                                                                                      {return consumer_->name();}
@@ -129,7 +130,7 @@ public:
        {
        }
        
-       virtual bool send(const_frame frame)                                    
+       virtual boost::unique_future<bool> send(const_frame frame)                                      
        {
                try
                {
@@ -147,7 +148,7 @@ public:
                        {
                                CASPAR_LOG_CURRENT_EXCEPTION();
                                CASPAR_LOG(error) << print() << " Failed to recover consumer.";
-                               return false;
+                               return wrap_as_future(false);
                        }
                }
        }
@@ -188,12 +189,12 @@ public:
                consumer_->initialize(format_desc, channel_index);
        }
 
-       bool send(const_frame frame) override
+       boost::unique_future<bool> send(const_frame frame) override
        {               
                if(audio_cadence_.size() == 1)
                        return consumer_->send(frame);
 
-               bool result = true;
+               boost::unique_future<bool> result = wrap_as_future(true);
                
                if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame.audio_data().size())) 
                {       
@@ -206,7 +207,7 @@ public:
 
                sync_buffer_.push_back(static_cast<int>(frame.audio_data().size()));
                
-               return result;
+               return std::move(result);
        }
        
        std::wstring print() const override                                                                             {return consumer_->print();}
@@ -253,7 +254,7 @@ const spl::shared_ptr<frame_consumer>& frame_consumer::empty()
        class empty_frame_consumer : public frame_consumer
        {
        public:
-               bool send(const_frame) override {return false;}
+               boost::unique_future<bool> send(const_frame) override {return wrap_as_future(false);}
                void initialize(const video_format_desc&, int) override{}
                std::wstring print() const override {return L"empty";}
                std::wstring name() const override {return L"empty";}
index 9fe65434339b9a22246c11b7fc5eaed6d48c3063..9288b2a487094070c35701d6bbb121e808bccfb7 100644 (file)
 #include <common/memory.h>
 
 #include <boost/property_tree/ptree_fwd.hpp>
+#include <boost/thread/future.hpp>
 
 #include <functional>
 #include <string>
 #include <vector>
 
 namespace caspar { namespace core {
-       
+
 // Interface
 class frame_consumer : public monitor::observable
 {
@@ -51,7 +52,7 @@ public:
        
        // Methods
 
-       virtual bool                                                    send(class const_frame frame) = 0;
+       virtual boost::unique_future<bool>              send(class const_frame frame) = 0;
        virtual void                                                    initialize(const struct video_format_desc& format_desc, int channel_index) = 0;
        
        // monitor::observable
index 43aa68f8f40b038ca3d5ca7c062f69e762d2168c..f6aaf6d96e1d2760488787fb4c42032e329543fd 100644 (file)
@@ -177,22 +177,37 @@ public:
                        if(!frames_.full())
                                return;
 
-                       for(auto it = ports_.begin(); it != ports_.end();)
+                       std::map<int, boost::unique_future<bool>> send_results;
+
+                       // Start invocations
+                       for (auto it = ports_.begin(); it != ports_.end(); ++it)
                        {
                                auto& port      = it->second;
                                auto& frame     = frames_.at(port.buffer_depth()-minmax.first);
                                        
                                try
                                {
-                                       if(port.send(frame))
-                                               ++it;
-                                       else
-                                               ports_.erase(it++);                                     
+                                       send_results.insert(std::make_pair(it->first, port.send(frame)));
                                }
-                               catch(...)
+                               catch (...)
                                {
                                        CASPAR_LOG_CURRENT_EXCEPTION();
-                                       ports_.erase(it++);
+                                       ports_.erase(it);
+                               }
+                       }
+
+                       // Retrieve results
+                       for (auto it = send_results.begin(); it != send_results.end(); ++it)
+                       {
+                               try
+                               {
+                                       if (!it->second.get())
+                                               ports_.erase(it->first);
+                               }
+                               catch (...)
+                               {
+                                       CASPAR_LOG_CURRENT_EXCEPTION();
+                                       ports_.erase(it->first);
                                }
                        }
                });
index ca2b063dd0a90bacd1fad794b82d433ef005459c..652690cb45aa4d883b5b3c5ce8ba8d2d3dab4382 100644 (file)
@@ -28,7 +28,7 @@ public:
                consumer_->initialize(format_desc, channel_index_);
        }
                
-       bool send(const_frame frame)
+       boost::unique_future<bool> send(const_frame frame)
        {
                event_subject_ << monitor::event("type") % consumer_->name();
                return consumer_->send(std::move(frame));
@@ -59,7 +59,7 @@ port::port(int index, int channel_index, spl::shared_ptr<frame_consumer> consume
 port::port(port&& other) : impl_(std::move(other.impl_)){}
 port::~port(){}
 port& port::operator=(port&& other){impl_ = std::move(other.impl_); return *this;}
-bool port::send(const_frame frame){return impl_->send(std::move(frame));}      
+boost::unique_future<bool> port::send(const_frame frame){return impl_->send(std::move(frame));}        
 void port::subscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.subscribe(o);}
 void port::unsubscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.unsubscribe(o);}
 void port::video_format_desc(const struct video_format_desc& format_desc){impl_->video_format_desc(format_desc);}
index 76ff625df731a67df5ec8a8f966aec33f9c6741c..de21c520ce7d29a9d951f62001cbdd4373ceb71d 100644 (file)
@@ -5,6 +5,7 @@
 #include <common/memory.h>
 
 #include <boost/property_tree/ptree_fwd.hpp>
+#include <boost/thread/future.hpp>
 
 namespace caspar { namespace core {
 
@@ -26,7 +27,7 @@ public:
 
        port& operator=(port&& other);
 
-       bool send(class const_frame frame);     
+       boost::unique_future<bool> send(class const_frame frame);       
 
        // monitor::observable
        
index 9a2c5588582cb086ac51c33688b927334a399d86..cbe48d10469d461559b95edcded08f08582b8ce8 100644 (file)
@@ -186,9 +186,9 @@ public:
                        CASPAR_LOG(error)<< print() << TEXT(" Failed to disable video output.");                
        }
        
-       void send(core::const_frame& frame)
+       boost::unique_future<bool> send(core::const_frame& frame)
        {                                       
-               executor_.begin_invoke([=]
+               return executor_.begin_invoke([=]() -> bool
                {
                        try
                        {       
@@ -200,6 +200,8 @@ public:
                        {
                                CASPAR_LOG_CURRENT_EXCEPTION();
                        }
+
+                       return true;
                });
        }
 
@@ -317,10 +319,9 @@ public:
                consumer_.reset(new bluefish_consumer(format_desc, device_index_, embedded_audio_, key_only_, channel_index));
        }
        
-       bool send(core::const_frame frame) override
+       boost::unique_future<bool> send(core::const_frame frame) override
        {
-               consumer_->send(frame);
-               return true;
+               return consumer_->send(frame);
        }
                
        std::wstring print() const override
index 73a39432aa5e3b358e8aaf22b3a26ea567f728c9..97043a03f51de99f30b7d000e1a8558166aee3b6 100644 (file)
@@ -36,6 +36,7 @@
 #include <common/except.h>
 #include <common/memshfl.h>
 #include <common/array.h>
+#include <common/future.h>
 
 #include <core/consumer/frame_consumer.h>
 
@@ -206,6 +207,7 @@ struct decklink_consumer : public IDeckLinkVideoOutputCallback, public IDeckLink
        
        spl::shared_ptr<diagnostics::graph> graph_;
        boost::timer tick_timer_;
+       retry_task<bool> send_completion_;
 
 public:
        decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index) 
@@ -374,7 +376,8 @@ public:
                                graph_->set_tag("flushed-frame");
 
                        auto frame = core::const_frame::empty();        
-                       video_frame_buffer_.pop(frame);                                 
+                       video_frame_buffer_.pop(frame);
+                       send_completion_.try_completion();
                        schedule_next_video(frame);     
                        
                        unsigned long buffered;
@@ -414,6 +417,7 @@ public:
                        {
                                auto frame = core::const_frame::empty();
                                audio_frame_buffer_.pop(frame);
+                               send_completion_.try_completion();
                                schedule_next_audio(frame.audio_data());
                        }
 
@@ -456,7 +460,7 @@ public:
                tick_timer_.restart();
        }
 
-       void send(core::const_frame frame)
+       boost::unique_future<bool> send(core::const_frame frame)
        {
                auto exception = lock(exception_mutex_, [&]
                {
@@ -469,9 +473,29 @@ public:
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(u8(print()) + " Is not running."));
                
-               if(config_.embedded_audio)
-                       audio_frame_buffer_.push(frame);        
-               video_frame_buffer_.push(frame);        
+               bool audio_ready = !config_.embedded_audio;
+               bool video_ready = false;
+
+               auto enqueue_task = [audio_ready, video_ready, frame, this]() mutable -> boost::optional<bool>
+               {
+                       if (!audio_ready)
+                               audio_ready = audio_frame_buffer_.try_push(frame);
+
+                       if (!video_ready)
+                               video_ready = video_frame_buffer_.try_push(frame);
+
+                       if (audio_ready && video_ready)
+                               return true;
+                       else
+                               return boost::optional<bool>();
+               };
+               
+               if (enqueue_task())
+                       return wrap_as_future(true);
+
+               send_completion_.set_task(enqueue_task);
+
+               return send_completion_.get_future();
        }
        
        std::wstring print() const
@@ -517,10 +541,9 @@ public:
                });
        }
        
-       bool send(core::const_frame frame) override
+       boost::unique_future<bool> send(core::const_frame frame) override
        {
-               consumer_->send(frame);
-               return true;
+               return consumer_->send(frame);
        }
        
        std::wstring print() const override
index eff99a64a26057e8bed0d0cdb2e69bddf4a3601c..243c0bf9e54d95f8843a18c8dd5a2cf8ee8170f6 100644 (file)
@@ -347,7 +347,7 @@ public:
        
        // frame_consumer
 
-       bool send(core::const_frame& frame)
+       boost::unique_future<bool> send(core::const_frame& frame)
        {
                auto exception = lock(exception_mutex_, [&]
                {
@@ -357,12 +357,12 @@ public:
                if(exception != nullptr)
                        std::rethrow_exception(exception);
                        
-               executor_.begin_invoke([=]
+               return executor_.begin_invoke([=]() -> bool
                {               
                        encode(frame);
+
+                       return true;
                });
-               
-               return true;
        }
 
        std::wstring print() const
@@ -744,7 +744,7 @@ public:
                consumer_.reset(new ffmpeg_consumer(u8(filename_), format_desc, options_));
        }
        
-       bool send(core::const_frame frame) override
+       boost::unique_future<bool> send(core::const_frame frame) override
        {
                return consumer_->send(frame);
        }
index b5b3767df78b804f9a914406cf0656c65aee0ea5..579ec876d5edb38fc5de2a6057404d15eb4600c2 100644 (file)
@@ -26,6 +26,7 @@
 #include <common/log.h>
 #include <common/utf.h>
 #include <common/array.h>
+#include <common/future.h>
 
 #include <core/consumer/frame_consumer.h>
 #include <core/video_format.h>
@@ -54,7 +55,7 @@ public:
        {
        }
        
-       bool send(core::const_frame frame) override
+       boost::unique_future<bool> send(core::const_frame frame) override
        {                               
                boost::thread async([frame]
                {
@@ -74,7 +75,7 @@ public:
                });
                async.detach();
 
-               return false;
+               return wrap_as_future(false);
        }
 
        std::wstring print() const override
index e65ef37ca4fa526128380f41a98f0099e1e01b32..e0084ee3e384a8f4b68c340b713a5cf25efeced1 100644 (file)
@@ -27,6 +27,7 @@
 #include <common/log.h>
 #include <common/utf.h>
 #include <common/env.h>
+#include <common/future.h>
 
 #include <core/consumer/frame_consumer.h>
 #include <core/frame/frame.h>
@@ -174,8 +175,10 @@ public:
                });
        }
        
-       bool send(core::const_frame frame) override
-       {                       
+       boost::unique_future<bool> send(core::const_frame frame) override
+       {
+               // Will only block if the default executor queue capacity of 512 is
+               // exhausted, which should not happen
                executor_.begin_invoke([=]
                {
                        ALenum state; 
@@ -213,7 +216,7 @@ public:
                        perf_timer_.restart();
                });
 
-               return true;
+               return wrap_as_future(true);
        }
        
        std::wstring print() const override
index 45b5b8a30bcf90aba773e7abd093c823dbfd9b6f..a14ed2d12cfecdc07e7405b9328b6da248af4a90 100644 (file)
@@ -32,6 +32,7 @@
 #include <common/memshfl.h>
 #include <common/utf.h>
 #include <common/prec_timer.h>
+#include <common/future.h>
 
 #include <ffmpeg/producer/filter/filter.h>
 
@@ -443,11 +444,12 @@ public:
        }
 
 
-       bool send(core::const_frame frame)
+       boost::unique_future<bool> send(core::const_frame frame)
        {
                if(!frame_buffer_.try_push(frame))
                        graph_->set_tag("dropped-frame");
-               return is_running_;
+
+               return wrap_as_future(is_running_.load());
        }
                
        std::wstring print() const
@@ -531,7 +533,7 @@ public:
                consumer_.reset(new screen_consumer(config_, format_desc, channel_index));
        }
        
-       bool send(core::const_frame frame) override
+       boost::unique_future<bool> send(core::const_frame frame) override
        {
                return consumer_->send(frame);
        }