]> git.sesse.net Git - casparcg/blobdiff - modules/decklink/consumer/decklink_consumer.cpp
Merged asynchronous invocation of consumers from 2.0
[casparcg] / modules / decklink / consumer / decklink_consumer.cpp
index 73a39432aa5e3b358e8aaf22b3a26ea567f728c9..97043a03f51de99f30b7d000e1a8558166aee3b6 100644 (file)
@@ -36,6 +36,7 @@
 #include <common/except.h>
 #include <common/memshfl.h>
 #include <common/array.h>
+#include <common/future.h>
 
 #include <core/consumer/frame_consumer.h>
 
@@ -206,6 +207,7 @@ struct decklink_consumer : public IDeckLinkVideoOutputCallback, public IDeckLink
        
        spl::shared_ptr<diagnostics::graph> graph_;
        boost::timer tick_timer_;
+       retry_task<bool> send_completion_;
 
 public:
        decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index) 
@@ -374,7 +376,8 @@ public:
                                graph_->set_tag("flushed-frame");
 
                        auto frame = core::const_frame::empty();        
-                       video_frame_buffer_.pop(frame);                                 
+                       video_frame_buffer_.pop(frame);
+                       send_completion_.try_completion();
                        schedule_next_video(frame);     
                        
                        unsigned long buffered;
@@ -414,6 +417,7 @@ public:
                        {
                                auto frame = core::const_frame::empty();
                                audio_frame_buffer_.pop(frame);
+                               send_completion_.try_completion();
                                schedule_next_audio(frame.audio_data());
                        }
 
@@ -456,7 +460,7 @@ public:
                tick_timer_.restart();
        }
 
-       void send(core::const_frame frame)
+       boost::unique_future<bool> send(core::const_frame frame)
        {
                auto exception = lock(exception_mutex_, [&]
                {
@@ -469,9 +473,29 @@ public:
                if(!is_running_)
                        CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(u8(print()) + " Is not running."));
                
-               if(config_.embedded_audio)
-                       audio_frame_buffer_.push(frame);        
-               video_frame_buffer_.push(frame);        
+               bool audio_ready = !config_.embedded_audio;
+               bool video_ready = false;
+
+               auto enqueue_task = [audio_ready, video_ready, frame, this]() mutable -> boost::optional<bool>
+               {
+                       if (!audio_ready)
+                               audio_ready = audio_frame_buffer_.try_push(frame);
+
+                       if (!video_ready)
+                               video_ready = video_frame_buffer_.try_push(frame);
+
+                       if (audio_ready && video_ready)
+                               return true;
+                       else
+                               return boost::optional<bool>();
+               };
+               
+               if (enqueue_task())
+                       return wrap_as_future(true);
+
+               send_completion_.set_task(enqueue_task);
+
+               return send_completion_.get_future();
        }
        
        std::wstring print() const
@@ -517,10 +541,9 @@ public:
                });
        }
        
-       bool send(core::const_frame frame) override
+       boost::unique_future<bool> send(core::const_frame frame) override
        {
-               consumer_->send(frame);
-               return true;
+               return consumer_->send(frame);
        }
        
        std::wstring print() const override