]> git.sesse.net Git - casparcg/commitdiff
Experimental support for synchronizing output of multiple consumers. For example...
authorHelge Norberg <helge.norberg@gmail.com>
Tue, 18 Jun 2013 12:26:43 +0000 (14:26 +0200)
committerHelge Norberg <helge.norberg@gmail.com>
Tue, 18 Jun 2013 12:26:43 +0000 (14:26 +0200)
<consumers>
  <synchronizing>
    <decklink>
      <device>1</device>
      <embedded-audio>true</embedded-audio>
    </decklink>
    <decklink>
      <device>2</device>
      <key-only>true</key-only>
    </decklink>
  </synchronizing>
</consumers>

Also added new INFO command showing some information about delays (producer - mixer - consumer):

INFO 1 DELAY
INFO 1-10 DELAY

35 files changed:
CHANGES.txt
core/consumer/frame_consumer.cpp
core/consumer/frame_consumer.h
core/consumer/output.cpp
core/consumer/output.h
core/consumer/synchronizing/synchronizing_consumer.cpp [new file with mode: 0644]
core/consumer/synchronizing/synchronizing_consumer.h [new file with mode: 0644]
core/core.vcxproj
core/core.vcxproj.filters
core/mixer/mixer.cpp
core/mixer/mixer.h
core/mixer/read_frame.cpp
core/mixer/read_frame.h
core/mixer/write_frame.cpp
core/mixer/write_frame.h
core/producer/channel/channel_producer.cpp
core/producer/frame/basic_frame.cpp
core/producer/frame/basic_frame.h
core/producer/layer.cpp
core/producer/layer.h
core/producer/stage.cpp
core/producer/stage.h
core/video_channel.cpp
core/video_channel.h
modules/bluefish/consumer/bluefish_consumer.cpp
modules/decklink/consumer/decklink_consumer.cpp
modules/decklink/decklink.vcxproj
modules/ffmpeg/consumer/ffmpeg_consumer.cpp
modules/flash/producer/flash_producer.cpp
modules/image/consumer/image_consumer.cpp
modules/oal/consumer/oal_consumer.cpp
modules/ogl/consumer/ogl_consumer.cpp
protocol/amcp/AMCPCommandsImpl.cpp
shell/casparcg.config
shell/server.cpp

index 9d290743c7d489a3274e0660e1c26d422fc53f6d..2ea84170028f81f3b648cf9bd6e8dfadd53fa349 100644 (file)
@@ -18,6 +18,27 @@ General
     http://casparcg.com/forum/viewtopic.php?f=3&t=1453 for more information.\r
   o ATI graphics card support. Static linking against SFML instead of dynamic.\r
     Should inprove ATI card support. Needs testing.\r
+  o An attempt to improve output synchronization of consumers has been made. Use\r
+    for example:\r
+    \r
+    <consumers>\r
+      <synchronizing>\r
+        <decklink>\r
+          <device>1</device>\r
+          <embedded-audio>true</embedded-audio>\r
+        </decklink>\r
+        <decklink>\r
+          <device>2</device>\r
+          <key-only>true</key-only>\r
+        </decklink>\r
+      </synchronizing>\r
+    </consumers>\r
+    \r
+    to say that both decklink consumers should be in sync with each other.\r
+    Consider this experimental, so don't wrap everything in <synchronizing />\r
+    unless synchronization of consumer outputs is needed. For synchronization to\r
+    be effective both cards (or same if it is for example a Decklink Quad) must\r
+    have reference signal connected.\r
 \r
 Layer\r
 -----\r
@@ -85,6 +106,8 @@ AMCP
     MIXER FILL, causing the image to be flipped.\r
   o MIXER <ch> STRAIGHT_ALPHA_OUTPUT added to control whether to output straight\r
     alpha or not.\r
+  o Added INFO <ch> DELAY and INFO <ch>-<layer> DELAY commands for showing some\r
+    delay measurements.\r
 \r
 OSC\r
 ---\r
index f8a87c3a1bb455efd6099437a7a77eb717b64e8b..012aaff909af58b52d6c3d6b84fbaec931229028 100644 (file)
@@ -88,6 +88,11 @@ public:
                consumer_->initialize(format_desc, channel_index);\r
        }\r
 \r
+       virtual int64_t presentation_frame_age_millis() const\r
+       {\r
+               return consumer_->presentation_frame_age_millis();\r
+       }\r
+\r
        virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) override\r
        {               \r
                if(audio_cadence_.size() == 1)\r
@@ -146,6 +151,7 @@ const safe_ptr<frame_consumer>& frame_consumer::empty()
        {\r
                virtual boost::unique_future<bool> send(const safe_ptr<read_frame>&) override { return caspar::wrap_as_future(false); }\r
                virtual void initialize(const video_format_desc&, int) override{}\r
+               virtual int64_t presentation_frame_age_millis() const { return 0; }\r
                virtual std::wstring print() const override {return L"empty";}\r
                virtual bool has_synchronization_clock() const override {return false;}\r
                virtual size_t buffer_depth() const override {return 0;};\r
index 52e2c321107b9701cad643f9895ffb377b5426cb..cdc058c474b710fdcc91db88c7fd1a9d572c8910 100644 (file)
@@ -43,6 +43,7 @@ struct frame_consumer : boost::noncopyable
        \r
        virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) = 0;\r
        virtual void initialize(const video_format_desc& format_desc, int channel_index) = 0;\r
+       virtual int64_t presentation_frame_age_millis() const = 0;\r
        virtual std::wstring print() const = 0;\r
        virtual boost::property_tree::wptree info() const = 0;\r
        virtual bool has_synchronization_clock() const {return true;}\r
index 665c5d10bdcdab7cf211bba7916c09a31ced1dd1..32d502c789a3a34dd1fbccc5e798360a2b264d62 100644 (file)
@@ -58,6 +58,7 @@ struct output::implementation
        high_prec_timer                                                                 sync_timer_;\r
 \r
        boost::circular_buffer<safe_ptr<read_frame>>    frames_;\r
+       std::map<int, int64_t>                                                  send_to_consumers_delays_;\r
 \r
        executor                                                                                executor_;\r
                \r
@@ -69,8 +70,8 @@ public:
                , executor_(L"output")\r
        {\r
                graph_->set_color("consume-time", diagnostics::color(1.0f, 0.4f, 0.0f, 0.8));\r
-       }       \r
-       \r
+       }\r
+\r
        void add(int index, safe_ptr<frame_consumer> consumer)\r
        {               \r
                remove(index);\r
@@ -101,6 +102,7 @@ public:
                        if(it != consumers_.end())\r
                        {\r
                                old_consumer = it->second;\r
+                               send_to_consumers_delays_.erase(it->first);\r
                                consumers_.erase(it);\r
                        }\r
                }, high_priority);\r
@@ -134,6 +136,7 @@ public:
                                {\r
                                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                                        CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
+                                       send_to_consumers_delays_.erase(it->first);\r
                                        consumers_.erase(it++);\r
                                }\r
                        }\r
@@ -142,18 +145,30 @@ public:
                        frames_.clear();\r
                });\r
        }\r
+       \r
+       std::map<int, size_t> buffer_depths_snapshot() const\r
+       {\r
+               std::map<int, size_t> result;\r
+\r
+               BOOST_FOREACH(auto& consumer, consumers_)\r
+                       result.insert(std::make_pair(\r
+                                       consumer.first,\r
+                                       consumer.second->buffer_depth()));\r
 \r
-       std::pair<size_t, size_t> minmax_buffer_depth() const\r
+               return std::move(result);\r
+       }\r
+\r
+       std::pair<size_t, size_t> minmax_buffer_depth(\r
+                       const std::map<int, size_t>& buffer_depths) const\r
        {               \r
                if(consumers_.empty())\r
                        return std::make_pair(0, 0);\r
                \r
-               auto buffer_depths = consumers_ | \r
-                                                        boost::adaptors::map_values | // std::function is MSVC workaround\r
-                                                        boost::adaptors::transformed(std::function<int(const safe_ptr<frame_consumer>&)>([](const safe_ptr<frame_consumer>& x){return x->buffer_depth();})); \r
+               auto depths = buffer_depths | boost::adaptors::map_values; \r
                \r
-\r
-               return std::make_pair(*boost::range::min_element(buffer_depths), *boost::range::max_element(buffer_depths));\r
+               return std::make_pair(\r
+                               *boost::range::min_element(depths),\r
+                               *boost::range::max_element(depths));\r
        }\r
 \r
        bool has_synchronization_clock() const\r
@@ -179,8 +194,9 @@ public:
                                        sync_timer_.tick(1.0/format_desc_.fps);\r
                                        return;\r
                                }\r
-                                       \r
-                               auto minmax = minmax_buffer_depth();\r
+                               \r
+                               auto buffer_depths = buffer_depths_snapshot();\r
+                               auto minmax = minmax_buffer_depth(buffer_depths);\r
 \r
                                frames_.set_capacity(minmax.second - minmax.first + 1);\r
                                frames_.push_back(input_frame);\r
@@ -194,7 +210,9 @@ public:
                                for (auto it = consumers_.begin(); it != consumers_.end();)\r
                                {\r
                                        auto consumer   = it->second;\r
-                                       auto frame              = frames_.at(consumer->buffer_depth()-minmax.first);\r
+                                       auto frame              = frames_.at(buffer_depths[it->first]-minmax.first);\r
+\r
+                                       send_to_consumers_delays_[it->first] = frame->get_age_millis();\r
                                                \r
                                        try\r
                                        {\r
@@ -213,6 +231,7 @@ public:
                                                {\r
                                                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                                                        CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
+                                                       send_to_consumers_delays_.erase(it->first);\r
                                                        it = consumers_.erase(it);\r
                                                }\r
                                        }\r
@@ -222,7 +241,7 @@ public:
                                for (auto result_it = send_results.begin(); result_it != send_results.end(); ++result_it)\r
                                {\r
                                        auto consumer           = consumers_.at(result_it->first);\r
-                                       auto frame                      = frames_.at(consumer->buffer_depth()-minmax.first);\r
+                                       auto frame                      = frames_.at(buffer_depths[result_it->first]-minmax.first);\r
                                        auto& result_future     = result_it->second;\r
                                                \r
                                        try\r
@@ -230,6 +249,7 @@ public:
                                                if(!result_future.get())\r
                                                {\r
                                                        CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";\r
+                                                       send_to_consumers_delays_.erase(result_it->first);\r
                                                        consumers_.erase(result_it->first);\r
                                                }\r
                                        }\r
@@ -242,6 +262,7 @@ public:
                                                        if(!consumer->send(frame).get())\r
                                                        {\r
                                                                CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";\r
+                                                               send_to_consumers_delays_.erase(result_it->first);\r
                                                                consumers_.erase(result_it->first);\r
                                                        }\r
                                                }\r
@@ -249,6 +270,7 @@ public:
                                                {\r
                                                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                                                        CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
+                                                       send_to_consumers_delays_.erase(result_it->first);\r
                                                        consumers_.erase(result_it->first);\r
                                                }\r
                                        }\r
@@ -282,6 +304,30 @@ public:
                }, high_priority));\r
        }\r
 \r
+       boost::unique_future<boost::property_tree::wptree> delay_info()\r
+       {\r
+               return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree\r
+               {                       \r
+                       boost::property_tree::wptree info;\r
+                       BOOST_FOREACH(auto& consumer, consumers_)\r
+                       {\r
+                               auto total_age =\r
+                                               consumer.second->presentation_frame_age_millis();\r
+                               auto sendoff_age = send_to_consumers_delays_[consumer.first];\r
+                               auto presentation_time = total_age - sendoff_age;\r
+\r
+                               boost::property_tree::wptree child;\r
+                               child.add(L"name", consumer.second->print());\r
+                               child.add(L"age-at-arrival", sendoff_age);\r
+                               child.add(L"presentation-time", presentation_time);\r
+                               child.add(L"age-at-presentation", total_age);\r
+\r
+                               info.add_child(L"consumer", child);\r
+                       }\r
+                       return info;\r
+               }, high_priority));\r
+       }\r
+\r
        bool empty()\r
        {\r
                return executor_.invoke([this]\r
@@ -299,5 +345,6 @@ void output::remove(const safe_ptr<frame_consumer>& consumer){impl_->remove(cons
 void output::send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& frame) {impl_->send(frame); }\r
 void output::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
 boost::unique_future<boost::property_tree::wptree> output::info() const{return impl_->info();}\r
+boost::unique_future<boost::property_tree::wptree> output::delay_info() const{return impl_->delay_info();}\r
 bool output::empty() const{return impl_->empty();}\r
 }}
\ No newline at end of file
index a301ff935fb1f2d9c9fea7f9c42622df28552768..d3a2b1fdeabe34d3cc6f8de28ca4623891cfe2bc 100644 (file)
@@ -53,6 +53,7 @@ public:
        void set_video_format_desc(const video_format_desc& format_desc);\r
 \r
        boost::unique_future<boost::property_tree::wptree> info() const;\r
+       boost::unique_future<boost::property_tree::wptree> delay_info() const;\r
 \r
        bool empty() const;\r
 private:\r
diff --git a/core/consumer/synchronizing/synchronizing_consumer.cpp b/core/consumer/synchronizing/synchronizing_consumer.cpp
new file mode 100644 (file)
index 0000000..d19f3e9
--- /dev/null
@@ -0,0 +1,421 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../../StdAfx.h"
+
+#include "synchronizing_consumer.h"
+
+#include <common/log/log.h>
+#include <common/diagnostics/graph.h>
+#include <common/concurrency/future_util.h>
+
+#include <core/video_format.h>
+
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/range/algorithm/min_element.hpp>
+#include <boost/range/algorithm/max_element.hpp>
+#include <boost/range/algorithm/for_each.hpp>
+#include <boost/range/algorithm/count_if.hpp>
+#include <boost/range/numeric.hpp>
+#include <boost/algorithm/string/join.hpp>
+#include <boost/thread/future.hpp>
+
+#include <functional>
+#include <vector>
+#include <queue>
+#include <utility>
+
+#include <tbb/atomic.h>
+
+namespace caspar { namespace core {
+
+using namespace boost::adaptors;
+
+class delegating_frame_consumer : public frame_consumer
+{
+       safe_ptr<frame_consumer> consumer_;
+public:
+       delegating_frame_consumer(const safe_ptr<frame_consumer>& consumer)
+               : consumer_(consumer)
+       {
+       }
+
+       frame_consumer& get_delegate()
+       {
+               return *consumer_;
+       }
+
+       const frame_consumer& get_delegate() const
+       {
+               return *consumer_;
+       }
+       
+       virtual void initialize(
+                       const video_format_desc& format_desc, int channel_index) override
+       {
+               get_delegate().initialize(format_desc, channel_index);
+       }
+
+       virtual int64_t presentation_frame_age_millis() const
+       {
+               return get_delegate().presentation_frame_age_millis();
+       }
+
+       virtual boost::unique_future<bool> send(
+                       const safe_ptr<read_frame>& frame) override
+       {               
+               return get_delegate().send(frame);
+       }
+
+       virtual std::wstring print() const override
+       {
+               return get_delegate().print();
+       }
+
+       virtual boost::property_tree::wptree info() const override
+       {
+               return get_delegate().info();
+       }
+
+       virtual bool has_synchronization_clock() const override
+       {
+               return get_delegate().has_synchronization_clock();
+       }
+
+       virtual size_t buffer_depth() const override
+       {
+               return get_delegate().buffer_depth();
+       }
+
+       virtual int index() const override
+       {
+               return get_delegate().index();
+       }
+};
+
+class buffering_consumer_adapter : public delegating_frame_consumer
+{
+       std::queue<safe_ptr<read_frame>>        buffer_;
+       tbb::atomic<size_t>                                     buffered_;
+       tbb::atomic<int64_t>                            duplicate_next_;
+public:
+       buffering_consumer_adapter(const safe_ptr<frame_consumer>& consumer)
+               : delegating_frame_consumer(consumer)
+       {
+               buffered_ = 0;
+               duplicate_next_ = 0;
+       }
+
+       boost::unique_future<bool> consume_one()
+       {
+               if (!buffer_.empty())
+               {
+                       buffer_.pop();
+                       --buffered_;
+               }
+
+               return get_delegate().send(buffer_.front());
+       }
+
+       virtual boost::unique_future<bool> send(
+                       const safe_ptr<read_frame>& frame) override
+       {
+               if (duplicate_next_)
+               {
+                       --duplicate_next_;
+               }
+               else if (!buffer_.empty())
+               {
+                       buffer_.pop();
+                       --buffered_;
+               }
+
+               buffer_.push(frame);
+               ++buffered_;
+
+               return get_delegate().send(buffer_.front());
+       }
+
+       void duplicate_next(int64_t to_duplicate)
+       {
+               duplicate_next_ += to_duplicate;
+       }
+
+       size_t num_buffered() const
+       {
+               return buffered_ - 1;
+       }
+
+       virtual std::wstring print() const override
+       {
+               return L"buffering[" + get_delegate().print() + L"]";
+       }
+
+       virtual boost::property_tree::wptree info() const override
+       {
+               boost::property_tree::wptree info;
+
+               info.add(L"type", L"buffering-consumer-adapter");
+               info.add_child(L"consumer", get_delegate().info());
+               info.add(L"buffered-frames", num_buffered());
+
+               return info;
+       }
+};
+
+static const uint64_t MAX_BUFFERED_OUT_OF_MEMORY_GUARD = 5;
+
+struct synchronizing_consumer::implementation
+{
+private:
+       std::vector<safe_ptr<buffering_consumer_adapter>>       consumers_;
+       size_t                                                                                          buffer_depth_;
+       bool                                                                                            has_synchronization_clock_;
+       std::vector<boost::unique_future<bool>>                         results_;
+       boost::promise<bool>                                                            promise_;
+       video_format_desc                                                                       format_desc_;
+       safe_ptr<diagnostics::graph>                                            graph_;
+       int64_t                                                                                         grace_period_;
+       tbb::atomic<int64_t>                                                            current_diff_;
+public:
+       implementation(const std::vector<safe_ptr<frame_consumer>>& consumers)
+               : grace_period_(0)
+       {
+               BOOST_FOREACH(auto& consumer, consumers)
+                       consumers_.push_back(make_safe<buffering_consumer_adapter>(consumer));
+
+               current_diff_ = 0;
+               auto buffer_depths = consumers | transformed(std::mem_fn(&frame_consumer::buffer_depth));
+               std::vector<size_t> depths(buffer_depths.begin(), buffer_depths.end());
+               buffer_depth_ = *boost::max_element(depths);
+               has_synchronization_clock_ = boost::count_if(consumers, std::mem_fn(&frame_consumer::has_synchronization_clock)) > 0;
+
+               graph_->set_text(print());
+               diagnostics::register_graph(graph_);
+       }
+
+       boost::unique_future<bool> send(const safe_ptr<read_frame>& frame)
+       {
+               results_.clear();
+
+               BOOST_FOREACH(auto& consumer, consumers_)
+                       results_.push_back(consumer->send(frame));
+
+               promise_ = boost::promise<bool>();
+               promise_.set_wait_callback(std::function<void(boost::promise<bool>&)>([this](boost::promise<bool>& promise)
+               {
+                       BOOST_FOREACH(auto& result, results_)
+                       {
+                               result.get();
+                       }
+
+                       auto frame_ages = consumers_ | transformed(std::mem_fn(&frame_consumer::presentation_frame_age_millis));
+                       std::vector<int64_t> ages(frame_ages.begin(), frame_ages.end());
+                       auto max_age_iter = boost::max_element(ages);
+                       auto min_age_iter = boost::min_element(ages);
+                       int64_t min_age = *min_age_iter;
+                       
+                       if (min_age == 0)
+                       {
+                               // One of the consumers have yet no measurement, wait until next 
+                               // frame until we make any assumptions.
+                               promise.set_value(true);
+                               return;
+                       }
+
+                       int64_t max_age = *max_age_iter;
+                       int64_t age_diff = max_age - min_age;
+
+                       current_diff_ = age_diff;
+
+                       for (unsigned i = 0; i < ages.size(); ++i)
+                               graph_->set_value(narrow(consumers_[i]->print()), static_cast<double>(ages[i]) / *max_age_iter);
+
+                       bool grace_period_over = grace_period_ == 1;
+
+                       if (grace_period_)
+                               --grace_period_;
+
+                       if (grace_period_ == 0)
+                       {
+                               int64_t frame_duration = static_cast<int64_t>(1000 / format_desc_.fps);
+
+                               if (age_diff >= frame_duration)
+                               {
+                                       CASPAR_LOG(info) << print() << L" Consumers not in sync. min: " << min_age << L" max: " << max_age;
+
+                                       auto index = min_age_iter - ages.begin();
+                                       auto to_duplicate = age_diff / frame_duration;
+                                       auto& consumer = *consumers_.at(index);
+
+                                       auto currently_buffered = consumer.num_buffered();
+
+                                       if (currently_buffered + to_duplicate > MAX_BUFFERED_OUT_OF_MEMORY_GUARD)
+                                       {
+                                               CASPAR_LOG(info) << print() << L" Protecting from out of memory. Duplicating less frames than calculated";
+
+                                               to_duplicate = MAX_BUFFERED_OUT_OF_MEMORY_GUARD - currently_buffered;
+                                       }
+
+                                       consumer.duplicate_next(to_duplicate);
+
+                                       grace_period_ = 10 + to_duplicate + buffer_depth_;
+                               }
+                               else if (grace_period_over)
+                               {
+                                       CASPAR_LOG(info) << print() << L" Consumers resynced. min: " << min_age << L" max: " << max_age;
+                               }
+                       }
+
+                       blocking_consume_unnecessarily_buffered();
+
+                       promise.set_value(true);
+               }));
+
+               return promise_.get_future();
+       }
+
+       void blocking_consume_unnecessarily_buffered()
+       {
+               auto buffered = consumers_ | transformed(std::mem_fn(&buffering_consumer_adapter::num_buffered));
+               std::vector<size_t> num_buffered(buffered.begin(), buffered.end());
+               auto min_buffered = *boost::min_element(num_buffered);
+
+               if (min_buffered)
+                       CASPAR_LOG(info) << print() << L" " << min_buffered 
+                                       << L" frames unnecessarily buffered. Consuming and letting channel pause during that time.";
+
+               while (min_buffered)
+               {
+                       std::vector<boost::unique_future<bool>> results;
+
+                       BOOST_FOREACH(auto& consumer, consumers_)
+                               results.push_back(consumer->consume_one());
+
+                       BOOST_FOREACH(auto& result, results)
+                               result.get();
+
+                       --min_buffered;
+               }
+       }
+
+       void initialize(const video_format_desc& format_desc, int channel_index)
+       {
+               boost::for_each(
+                               consumers_, 
+                               [&] (const safe_ptr<frame_consumer>& consumer) 
+                               {
+                                       consumer->initialize(format_desc, channel_index); 
+                               });
+               format_desc_ = format_desc;
+       }
+
+       int64_t presentation_frame_age_millis() const
+       {
+               int64_t result = 0;
+
+               BOOST_FOREACH(auto& consumer, consumers_)
+                       result = std::max(result, consumer->presentation_frame_age_millis());
+
+               return result;
+       }
+
+       std::wstring print() const
+       {
+               return L"synchronized[" + boost::algorithm::join(consumers_ | transformed(std::mem_fn(&frame_consumer::print)), L"|") +  L"]";
+       }
+
+       boost::property_tree::wptree info() const
+       {
+               boost::property_tree::wptree info;
+
+               info.add(L"type", L"synchronized-consumer");
+
+               BOOST_FOREACH(auto& consumer, consumers_)
+                       info.add_child(L"consumer", consumer->info());
+
+               info.add(L"age-diff", current_diff_);
+
+               return info;
+       }
+
+       bool has_synchronization_clock() const
+       {
+               return has_synchronization_clock_;
+       }
+
+       size_t buffer_depth() const
+       {
+               return buffer_depth_;
+       }
+
+       int index() const
+       {
+               return boost::accumulate(consumers_ | transformed(std::mem_fn(&frame_consumer::index)), 10000);
+       }
+};
+
+synchronizing_consumer::synchronizing_consumer(const std::vector<safe_ptr<frame_consumer>>& consumers)
+       : impl_(new implementation(consumers))
+{
+}
+
+boost::unique_future<bool> synchronizing_consumer::send(const safe_ptr<read_frame>& frame)
+{
+       return impl_->send(frame);
+}
+
+void synchronizing_consumer::initialize(const video_format_desc& format_desc, int channel_index)
+{
+       impl_->initialize(format_desc, channel_index);
+}
+
+int64_t synchronizing_consumer::presentation_frame_age_millis() const
+{
+       return impl_->presentation_frame_age_millis();
+}
+
+std::wstring synchronizing_consumer::print() const
+{
+       return impl_->print();
+}
+
+boost::property_tree::wptree synchronizing_consumer::info() const
+{
+       return impl_->info();
+}
+
+bool synchronizing_consumer::has_synchronization_clock() const
+{
+       return impl_->has_synchronization_clock();
+}
+
+size_t synchronizing_consumer::buffer_depth() const
+{
+       return impl_->buffer_depth();
+}
+
+int synchronizing_consumer::index() const
+{
+       return impl_->index();
+}
+
+}}
diff --git a/core/consumer/synchronizing/synchronizing_consumer.h b/core/consumer/synchronizing/synchronizing_consumer.h
new file mode 100644 (file)
index 0000000..022537b
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include "../frame_consumer.h"
+
+#include <vector>
+#include <memory>
+
+namespace caspar { namespace core {
+
+class read_frame;
+struct video_format_desc;
+
+class synchronizing_consumer : public frame_consumer
+{
+public:
+       synchronizing_consumer(
+                       const std::vector<safe_ptr<frame_consumer>>& consumers);
+       virtual boost::unique_future<bool> send(
+                       const safe_ptr<read_frame>& frame) override;
+       virtual void initialize(
+                       const video_format_desc& format_desc, int channel_index) override;
+       virtual int64_t presentation_frame_age_millis() const override;
+       virtual std::wstring print() const override;
+       virtual boost::property_tree::wptree info() const override;
+       virtual bool has_synchronization_clock() const override;
+       virtual size_t buffer_depth() const override;
+       virtual int index() const override;
+private:
+       struct implementation;
+       std::unique_ptr<implementation> impl_;
+};
+
+}}
index a28e87e2ae047b5fc1e63a523a17f8e39b8bdcf5..1c3fc9358bb385814849c4b770df9889f9071d6b 100644 (file)
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>\r
+<?xml version="1.0" encoding="utf-8"?>\r
 <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">\r
   <ItemGroup Label="ProjectConfigurations">\r
     <ProjectConfiguration Include="Profile|Win32">\r
     <Lib />\r
   </ItemDefinitionGroup>\r
   <ItemGroup>\r
+    <ClInclude Include="consumer\synchronizing\synchronizing_consumer.h" />\r
     <ClInclude Include="mixer\audio\audio_util.h" />\r
     <ClInclude Include="mixer\gpu\fence.h" />\r
     <ClInclude Include="mixer\gpu\shader.h" />\r
     <ClInclude Include="StdAfx.h" />\r
   </ItemGroup>\r
   <ItemGroup>\r
+    <ClCompile Include="consumer\synchronizing\synchronizing_consumer.cpp">\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">../../StdAfx.h</PrecompiledHeaderFile>\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../../StdAfx.h</PrecompiledHeaderFile>\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">../../StdAfx.h</PrecompiledHeaderFile>\r
+      <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">../../StdAfx.h</PrecompiledHeaderFile>\r
+    </ClCompile>\r
     <ClCompile Include="mixer\audio\audio_util.cpp">\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">../../StdAfx.h</PrecompiledHeaderFile>\r
       <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../../StdAfx.h</PrecompiledHeaderFile>\r
index ab0981f716c297ecc11efed0beedea773a903555..5138ec7dced13f0f8bc90acecd1bd3224b4c2ad9 100644 (file)
@@ -43,6 +43,9 @@
     <Filter Include="source\monitor">\r
       <UniqueIdentifier>{d8525088-072a-47d2-b6e1-ad662881f505}</UniqueIdentifier>\r
     </Filter>\r
+    <Filter Include="source\consumer\synchronizing">\r
+      <UniqueIdentifier>{19ddc31c-5865-46d5-a8a6-a96d6fa1ffc7}</UniqueIdentifier>\r
+    </Filter>\r
     <Filter Include="source\parameters">\r
       <UniqueIdentifier>{d04737a6-96b2-40cd-b1e7-e90b69006cd1}</UniqueIdentifier>\r
     </Filter>\r
     <ClInclude Include="monitor\monitor.h">\r
       <Filter>source\monitor</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="consumer\synchronizing\synchronizing_consumer.h">\r
+      <Filter>source\consumer\synchronizing</Filter>\r
+    </ClInclude>\r
     <ClInclude Include="parameters\parameters.h">\r
       <Filter>source\parameters</Filter>\r
     </ClInclude>\r
     <ClCompile Include="mixer\audio\audio_util.cpp">\r
       <Filter>source\mixer\audio</Filter>\r
     </ClCompile>\r
+    <ClCompile Include="consumer\synchronizing\synchronizing_consumer.cpp">\r
+      <Filter>source\consumer\synchronizing</Filter>\r
+    </ClCompile>\r
     <ClCompile Include="parameters\parameters.cpp">\r
       <Filter>source\parameters</Filter>\r
     </ClCompile>\r
index 04904d4df5e3c166507ed1a3b2da79b76d16051b..25a6a99f76f034ed98ab6633cac2285d40b6da7e 100644 (file)
@@ -31,6 +31,7 @@
 \r
 #include <common/env.h>\r
 #include <common/concurrency/executor.h>\r
+#include <common/concurrency/future_util.h>\r
 #include <common/exception/exceptions.h>\r
 #include <common/gl/gl_check.h>\r
 #include <common/utility/tweener.h>\r
@@ -51,6 +52,7 @@
 \r
 #include <tbb/concurrent_queue.h>\r
 #include <tbb/spin_mutex.h>\r
+#include <tbb/atomic.h>\r
 \r
 #include <unordered_map>\r
 \r
@@ -60,6 +62,7 @@ struct mixer::implementation : boost::noncopyable
 {              \r
        safe_ptr<diagnostics::graph>    graph_;\r
        boost::timer                                    mix_timer_;\r
+       tbb::atomic<int64_t>                    current_mix_time_;\r
 \r
        safe_ptr<mixer::target_t>               target_;\r
        mutable tbb::spin_mutex                 format_desc_mutex_;\r
@@ -88,6 +91,7 @@ public:
                , executor_(L"mixer")\r
        {                       \r
                graph_->set_color("mix-time", diagnostics::color(1.0f, 0.0f, 0.9f, 0.8));\r
+               current_mix_time_ = 0;\r
        }\r
        \r
        void send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, std::shared_ptr<void>>& packet)\r
@@ -115,7 +119,9 @@ public:
                                auto audio = audio_mixer_(format_desc_, audio_channel_layout_);\r
                                image.wait();\r
 \r
-                               graph_->set_value("mix-time", mix_timer_.elapsed()*format_desc_.fps*0.5);\r
+                               auto mix_time = mix_timer_.elapsed();\r
+                               graph_->set_value("mix-time", mix_time*format_desc_.fps*0.5);\r
+                               current_mix_time_ = static_cast<int64_t>(mix_time * 1000.0);\r
 \r
                                target_->send(std::make_pair(make_safe<read_frame>(ogl_, format_desc_.size, std::move(image.get()), std::move(audio), audio_channel_layout_), packet.second));\r
                        }\r
@@ -231,9 +237,18 @@ public:
 \r
        boost::unique_future<boost::property_tree::wptree> info() const\r
        {\r
-               boost::promise<boost::property_tree::wptree> info;\r
-               info.set_value(boost::property_tree::wptree());\r
-               return info.get_future();\r
+               boost::property_tree::wptree info;\r
+               info.add(L"mix-time", current_mix_time_);\r
+\r
+               return wrap_as_future(std::move(info));\r
+       }\r
+\r
+       boost::unique_future<boost::property_tree::wptree> delay_info() const\r
+       {\r
+               boost::property_tree::wptree info;\r
+               info.put_value(current_mix_time_);\r
+\r
+               return wrap_as_future(std::move(info));\r
        }\r
 };\r
        \r
@@ -254,4 +269,5 @@ float mixer::get_master_volume() { return impl_->get_master_volume(); }
 void mixer::set_master_volume(float volume) { impl_->set_master_volume(volume); }\r
 void mixer::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
 boost::unique_future<boost::property_tree::wptree> mixer::info() const{return impl_->info();}\r
+boost::unique_future<boost::property_tree::wptree> mixer::delay_info() const{return impl_->delay_info();}\r
 }}
\ No newline at end of file
index 3c59f5cb21bbca3038bf8b92e3ee4f51f10481ee..0b3aad03c66ba91464f313b55198706c3fe204b2 100644 (file)
@@ -80,6 +80,7 @@ public:
        void set_master_volume(float volume);\r
 \r
        boost::unique_future<boost::property_tree::wptree> info() const;\r
+       boost::unique_future<boost::property_tree::wptree> delay_info() const;\r
        \r
 private:\r
        struct implementation;\r
index 8c1e3c7639a25e23960cdb39ce1abc2d2e06bf13..f8c4215325e18ee27332e26fbe94c4bcfdfe6681 100644 (file)
 \r
 #include <tbb/mutex.h>\r
 \r
+#include <boost/chrono.hpp>\r
+\r
 namespace caspar { namespace core {\r
+\r
+int64_t get_current_time_millis()\r
+{\r
+       using namespace boost::chrono;\r
+\r
+       return duration_cast<milliseconds>(\r
+                       high_resolution_clock::now().time_since_epoch()).count();\r
+}\r
                                                                                                                                                                                                                                                                                                                        \r
 struct read_frame::implementation : boost::noncopyable\r
 {\r
@@ -39,6 +49,7 @@ struct read_frame::implementation : boost::noncopyable
        tbb::mutex                                      mutex_;\r
        audio_buffer                            audio_data_;\r
        channel_layout                          audio_channel_layout_;\r
+       int64_t                                         created_timestamp_;\r
 \r
 public:\r
        implementation(\r
@@ -52,6 +63,7 @@ public:
                , image_data_(std::move(image_data))\r
                , audio_data_(std::move(audio_data))\r
                , audio_channel_layout_(audio_channel_layout)\r
+               , created_timestamp_(get_current_time_millis())\r
        {\r
        }       \r
        \r
@@ -107,6 +119,11 @@ const multichannel_view<const int32_t, boost::iterator_range<const int32_t*>::co
                        impl_->audio_channel_layout_);\r
 }\r
 \r
+int64_t read_frame::get_age_millis() const\r
+{\r
+       return impl_ ? get_current_time_millis() - impl_->created_timestamp_ : 0;\r
+}\r
+\r
 //#include <tbb/scalable_allocator.h>\r
 //#include <tbb/parallel_for.h>\r
 //#include <tbb/enumerable_thread_specific.h>\r
index 8817436008a9e78ccf36c88b7e21b7a664ea43dc..96a851ea498d0cdab7b8af4c2dd0f985e5d664eb 100644 (file)
@@ -54,6 +54,7 @@ public:
 \r
        virtual size_t image_size() const;\r
        virtual int num_channels() const;\r
+       virtual int64_t get_age_millis() const;\r
        virtual const multichannel_view<const int32_t, boost::iterator_range<const int32_t*>::const_iterator> multichannel_view() const;\r
                \r
 private:\r
index c46e78a7e7e4821dba19cad1b8be30eb67304a4c..37ac8ae8ff14919c75205fb8a1ac22bab560771c 100644 (file)
@@ -32,6 +32,9 @@
 #include <core/mixer/audio/audio_util.h>\r
 \r
 #include <boost/lexical_cast.hpp>\r
+#include <boost/timer.hpp>\r
+\r
+#include <tbb/atomic.h>\r
 \r
 namespace caspar { namespace core {\r
                                                                                                                                                                                                                                                                                                                        \r
@@ -45,11 +48,14 @@ struct write_frame::implementation
        const channel_layout                                            channel_layout_;\r
        const void*                                                                     tag_;\r
        core::field_mode::type                                          mode_;\r
+       boost::timer                                                            since_created_timer_;\r
+       tbb::atomic<int64_t>                                            recorded_frame_age_;\r
 \r
        implementation(const void* tag, const channel_layout& channel_layout)\r
                : channel_layout_(channel_layout)\r
                , tag_(tag)\r
        {\r
+               recorded_frame_age_ = -1;\r
        }\r
 \r
        implementation(const safe_ptr<ogl_device>& ogl, const void* tag, const core::pixel_format_desc& desc, const channel_layout& channel_layout) \r
@@ -67,6 +73,8 @@ struct write_frame::implementation
                {\r
                        return ogl_->create_device_buffer(plane.width, plane.height, plane.channels);   \r
                });\r
+\r
+               recorded_frame_age_ = -1;\r
        }\r
                        \r
        void accept(write_frame& self, core::frame_visitor& visitor)\r
@@ -76,6 +84,15 @@ struct write_frame::implementation
                visitor.end();\r
        }\r
 \r
+       int64_t get_and_record_age_millis()\r
+       {\r
+               if (recorded_frame_age_ == -1)\r
+                       recorded_frame_age_ = static_cast<int64_t>(\r
+                                       since_created_timer_.elapsed() * 1000.0);\r
+\r
+               return recorded_frame_age_;\r
+       }\r
+\r
        boost::iterator_range<uint8_t*> image_data(size_t index)\r
        {\r
                if(index >= buffers_.size() || !buffers_[index]->data())\r
@@ -155,5 +172,6 @@ void write_frame::commit(){impl_->commit();}
 void write_frame::set_type(const field_mode::type& mode){impl_->mode_ = mode;}\r
 core::field_mode::type write_frame::get_type() const{return impl_->mode_;}\r
 void write_frame::accept(core::frame_visitor& visitor){impl_->accept(*this, visitor);}\r
+int64_t write_frame::get_and_record_age_millis() { return impl_->get_and_record_age_millis(); }\r
 \r
 }}
\ No newline at end of file
index a2b1435ba97177fa2cb74a979bcade9f18d0a7a3..a096ebd9a474b97e3ef8990e8a3553414172851f 100644 (file)
@@ -56,6 +56,7 @@ public:
        // basic_frame\r
 \r
        virtual void accept(frame_visitor& visitor) override;\r
+       virtual int64_t get_and_record_age_millis() override;\r
 \r
        // write _frame\r
 \r
@@ -75,7 +76,6 @@ public:
        const core::pixel_format_desc& get_pixel_format_desc() const;\r
        const channel_layout& get_channel_layout() const;\r
        multichannel_view<int32_t, audio_buffer::iterator> get_multichannel_view();\r
-       \r
 private:\r
        friend class image_mixer;\r
        \r
index d0cf09c372a711633be35d26add557f0ce8b62f8..9839eaf1d950361c777ac835134ef9c3b2ae72da 100644 (file)
@@ -47,11 +47,13 @@ class channel_consumer : public frame_consumer
        core::video_format_desc                                                                         format_desc_;\r
        int                                                                                                                     channel_index_;\r
        tbb::atomic<bool>                                                                                       is_running_;\r
+       tbb::atomic<int64_t>                                                                            current_age_;\r
 \r
 public:\r
        channel_consumer() \r
        {\r
                is_running_ = true;\r
+               current_age_ = 0;\r
                frame_buffer_.set_capacity(3);\r
        }\r
 \r
@@ -74,6 +76,11 @@ public:
                channel_index_  = channel_index;\r
        }\r
 \r
+       virtual int64_t presentation_frame_age_millis() const override\r
+       {\r
+               return current_age_;\r
+       }\r
+\r
        virtual std::wstring print() const override\r
        {\r
                return L"[channel-consumer|" + boost::lexical_cast<std::wstring>(channel_index_) + L"]";\r
@@ -121,6 +128,7 @@ public:
                        return make_safe<read_frame>();\r
                std::shared_ptr<read_frame> frame;\r
                frame_buffer_.try_pop(frame);\r
+               current_age_ = frame->get_age_millis();\r
                return frame;\r
        }\r
 };\r
index 0190de945199d1ea580f318b6d107e971b844b68..c5acc03de7b65c5a8c8757cfdae4904a5c0cc823 100644 (file)
@@ -35,7 +35,6 @@ struct basic_frame::implementation
        std::vector<safe_ptr<basic_frame>> frames_;\r
 \r
        frame_transform frame_transform_;       \r
-       \r
 public:\r
        implementation(const std::vector<safe_ptr<basic_frame>>& frames) : frames_(frames) \r
        {\r
@@ -51,6 +50,19 @@ public:
        { \r
                frames_.push_back(frame);\r
        }\r
+\r
+       int64_t get_and_record_age_millis(const basic_frame& self)\r
+       {\r
+               int64_t result = 0;\r
+\r
+               BOOST_FOREACH(auto& frame, frames_)\r
+               {\r
+                       if (is_concrete_frame(frame) && frame.get() != &self)\r
+                               result = std::max(result, frame->get_and_record_age_millis());\r
+               }\r
+\r
+               return result;\r
+       }\r
        \r
        void accept(basic_frame& self, frame_visitor& visitor)\r
        {\r
@@ -84,6 +96,7 @@ void basic_frame::swap(basic_frame& other){impl_.swap(other.impl_);}
 \r
 const frame_transform& basic_frame::get_frame_transform() const { return impl_->frame_transform_;}\r
 frame_transform& basic_frame::get_frame_transform() { return impl_->frame_transform_;}\r
+int64_t basic_frame::get_and_record_age_millis() { return impl_->get_and_record_age_millis(*this); }\r
 void basic_frame::accept(frame_visitor& visitor){impl_->accept(*this, visitor);}\r
 \r
 safe_ptr<basic_frame> basic_frame::interlace(const safe_ptr<basic_frame>& frame1, const safe_ptr<basic_frame>& frame2, field_mode::type mode)\r
index e57d7aa265976ba16c8ac6fa85df4d40e9a5e965..3acca647fd525b748ab401de6a25124269a51c2a 100644 (file)
@@ -55,7 +55,8 @@ public:
 \r
        const frame_transform& get_frame_transform() const;\r
        frame_transform& get_frame_transform();\r
-                               \r
+       virtual int64_t get_and_record_age_millis();\r
+\r
        static safe_ptr<basic_frame> interlace(const safe_ptr<basic_frame>& frame1, const safe_ptr<basic_frame>& frame2, field_mode::type mode);\r
        static safe_ptr<basic_frame> combine(const safe_ptr<basic_frame>& frame1, const safe_ptr<basic_frame>& frame2);\r
        static safe_ptr<basic_frame> fill_and_key(const safe_ptr<basic_frame>& fill, const safe_ptr<basic_frame>& key);\r
index 8787d9743e2c672706344f858cfe98dd0a457f54..71dd72bca44c101fe6e6f2a02b8d5ab2e8512a5c 100644 (file)
@@ -37,6 +37,7 @@ struct layer::implementation
        int64_t                                         frame_number_;\r
        int32_t                                         auto_play_delta_;\r
        bool                                            is_paused_;\r
+       int64_t                                         current_frame_age_;\r
        monitor::subject                        monitor_subject_;\r
 \r
 public:\r
@@ -133,7 +134,9 @@ public:
                                play();\r
                                return receive(hints);\r
                        }\r
-                               \r
+\r
+                       current_frame_age_ = frame->get_and_record_age_millis();\r
+\r
                        return frame;\r
                }\r
                catch(...)\r
@@ -165,11 +168,20 @@ public:
 \r
                info.add(L"nb_frames",   nb_frames == std::numeric_limits<int64_t>::max() ? -1 : nb_frames);\r
                info.add(L"frames-left", nb_frames == std::numeric_limits<int64_t>::max() ? -1 : (foreground_->nb_frames() - frame_number_ - auto_play_delta_));\r
+               info.add(L"frame-age", current_frame_age_);\r
                info.add_child(L"foreground.producer", foreground_->info());\r
                info.add_child(L"background.producer", background_->info());\r
                return info;\r
        }\r
 \r
+       boost::property_tree::wptree delay_info() const\r
+       {\r
+               boost::property_tree::wptree info;\r
+               info.add(L"producer", foreground_->print());\r
+               info.add(L"frame-age", current_frame_age_);\r
+               return info;\r
+       }\r
+\r
        void set_foreground(safe_ptr<core::frame_producer> producer)\r
        {\r
                foreground_->monitor_output().unlink_target(&monitor_subject_);\r
@@ -201,5 +213,6 @@ safe_ptr<frame_producer> layer::background() const { return impl_->background_;}
 bool layer::empty() const {return impl_->empty();}\r
 boost::unique_future<std::wstring> layer::call(bool foreground, const std::wstring& param){return impl_->call(foreground, param);}\r
 boost::property_tree::wptree layer::info() const{return impl_->info();}\r
+boost::property_tree::wptree layer::delay_info() const{return impl_->delay_info();}\r
 monitor::source& layer::monitor_output(){return impl_->monitor_subject_;}\r
 }}
\ No newline at end of file
index 9c57a932773a39bf076c028478cc7ab21c432ab8..061eb26b1bb3773fbde7e4175c5e42a95cc912ef 100644 (file)
@@ -64,6 +64,7 @@ public:
        safe_ptr<basic_frame> receive(int hints); // nothrow\r
 \r
        boost::property_tree::wptree info() const;\r
+       boost::property_tree::wptree delay_info() const;\r
        \r
        monitor::source& monitor_output();\r
 private:\r
index 57f7b5834e000547c9dbea728747b5fdf3abfda2..a75651fa2251ccdd8b5e7e580abf33dc3d93998d 100644 (file)
@@ -419,6 +419,26 @@ public:
                        return get_layer(index).info();\r
                }, high_priority));\r
        }\r
+\r
+       boost::unique_future<boost::property_tree::wptree> delay_info()\r
+       {\r
+               return std::move(executor_.begin_invoke([this]() -> boost::property_tree::wptree\r
+               {\r
+                       boost::property_tree::wptree info;\r
+                       BOOST_FOREACH(auto& layer, layers_)                     \r
+                               info.add_child(L"layer", layer.second->delay_info())\r
+                                       .add(L"index", layer.first);    \r
+                       return info;\r
+               }, high_priority));\r
+       }\r
+\r
+       boost::unique_future<boost::property_tree::wptree> delay_info(int index)\r
+       {\r
+               return std::move(executor_.begin_invoke([=]() -> boost::property_tree::wptree\r
+               {\r
+                       return get_layer(index).delay_info();\r
+               }, high_priority));\r
+       }\r
 };\r
 \r
 stage::stage(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc) \r
@@ -444,5 +464,7 @@ boost::unique_future<std::wstring> stage::call(int index, bool foreground, const
 void stage::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
 boost::unique_future<boost::property_tree::wptree> stage::info() const{return impl_->info();}\r
 boost::unique_future<boost::property_tree::wptree> stage::info(int index) const{return impl_->info(index);}\r
+boost::unique_future<boost::property_tree::wptree> stage::delay_info() const{return impl_->delay_info();}\r
+boost::unique_future<boost::property_tree::wptree> stage::delay_info(int index) const{return impl_->delay_info(index);}\r
 monitor::source& stage::monitor_output(){return impl_->monitor_subject_;}\r
 }}
\ No newline at end of file
index f8b2505ac4bc6879bbc27306bf0b9a45226fb2f7..213e9b9d92be7bd38a6a7fc795d292714dddb936 100644 (file)
@@ -83,6 +83,9 @@ public:
 \r
        boost::unique_future<boost::property_tree::wptree> info() const;\r
        boost::unique_future<boost::property_tree::wptree> info(int layer) const;\r
+\r
+       boost::unique_future<boost::property_tree::wptree> delay_info() const;\r
+       boost::unique_future<boost::property_tree::wptree> delay_info(int layer) const;\r
        \r
        void set_video_format_desc(const video_format_desc& format_desc);\r
                \r
index 101d93714f1783c3301803f0942df4e922d52792..196590365187efba87670ab24c948370064fec8d 100644 (file)
@@ -75,7 +75,7 @@ public:
 \r
                CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
        }\r
-       \r
+\r
        void set_video_format_desc(const video_format_desc& format_desc)\r
        {\r
                if(format_desc.format == core::video_format::invalid)\r
@@ -122,6 +122,25 @@ public:
    \r
                return info;                       \r
        }\r
+\r
+       boost::property_tree::wptree delay_info() const\r
+       {\r
+               boost::property_tree::wptree info;\r
+\r
+               auto stage_info  = stage_->delay_info();\r
+               auto mixer_info  = mixer_->delay_info();\r
+               auto output_info = output_->delay_info();\r
+\r
+               stage_info.timed_wait(boost::posix_time::seconds(2));\r
+               mixer_info.timed_wait(boost::posix_time::seconds(2));\r
+               output_info.timed_wait(boost::posix_time::seconds(2));\r
+\r
+               info.add_child(L"layers", stage_info.get());\r
+               info.add_child(L"mix-time", mixer_info.get());\r
+               info.add_child(L"consumers", output_info.get());\r
+\r
+               return info;\r
+       }\r
 };\r
 \r
 video_channel::video_channel(int index, const video_format_desc& format_desc, const safe_ptr<ogl_device>& ogl, const channel_layout& audio_channel_layout) \r
@@ -134,4 +153,5 @@ void video_channel::set_video_format_desc(const video_format_desc& format_desc){
 boost::property_tree::wptree video_channel::info() const{return impl_->info();}\r
 int video_channel::index() const {return impl_->index_;}\r
 monitor::source& video_channel::monitor_output(){return impl_->monitor_subject_;}\r
+boost::property_tree::wptree video_channel::delay_info() const { return impl_->delay_info(); }\r
 }}
\ No newline at end of file
index 63d748bafd8a95869033e5839e412db269c82992..984ed5608f4a0af36a7e89645e918fd7fd785d8d 100644 (file)
@@ -61,6 +61,7 @@ public:
        void set_video_format_desc(const video_format_desc& format_desc);\r
        \r
        boost::property_tree::wptree info() const;\r
+       boost::property_tree::wptree delay_info() const;\r
 \r
        int index() const;\r
        \r
index e3107e1a9c761bad208712a7e99608c65ed0d5c7..b6ed886de681473606074cc4ff90b6fa35de6f63 100644 (file)
@@ -40,6 +40,7 @@
 #include <core/mixer/audio/audio_util.h>\r
 \r
 #include <tbb/concurrent_queue.h>\r
+#include <tbb/atomic.h>\r
 \r
 #include <boost/timer.hpp>\r
 #include <boost/range/algorithm.hpp>\r
@@ -70,6 +71,8 @@ struct bluefish_consumer : boost::noncopyable
 \r
        std::array<blue_dma_buffer_ptr, 4>      reserved_frames_;       \r
        tbb::concurrent_bounded_queue<std::shared_ptr<core::read_frame>> frame_buffer_;\r
+       tbb::atomic<int64_t>                            presentation_delay_millis_;\r
+       std::shared_ptr<core::read_frame>       previous_frame_;\r
        \r
        const bool                                                      embedded_audio_;\r
        const bool                                                      key_only_;\r
@@ -95,6 +98,7 @@ public:
                , executor_(print())\r
        {\r
                executor_.set_capacity(1);\r
+               presentation_delay_millis_ = 0;\r
 \r
                graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));   \r
                graph_->set_color("sync-time", diagnostics::color(1.0f, 0.0f, 0.0f));\r
@@ -234,7 +238,12 @@ public:
                blue_->wait_output_video_synch(UPD_FMT_FRAME, n_field);\r
                graph_->set_value("sync-time", sync_timer_.elapsed()*format_desc_.fps*0.5);\r
                \r
-               frame_timer_.restart();         \r
+               frame_timer_.restart();\r
+\r
+               if (previous_frame_)\r
+                       presentation_delay_millis_ = previous_frame_->get_age_millis();\r
+\r
+               previous_frame_ = frame;\r
 \r
                // Copy to local buffers\r
                \r
@@ -351,6 +360,11 @@ public:
                return model_name_ + L" [" + boost::lexical_cast<std::wstring>(channel_index_) + L"-" + \r
                        boost::lexical_cast<std::wstring>(device_index_) + L"|" +  format_desc_.name + L"]";\r
        }\r
+\r
+       int64_t presentation_delay_millis() const\r
+       {\r
+               return presentation_delay_millis_;\r
+       }\r
 };\r
 \r
 struct bluefish_consumer_proxy : public core::frame_consumer\r
@@ -402,7 +416,7 @@ public:
                format_desc_ = format_desc;\r
                CASPAR_LOG(info) << print() << L" Successfully Initialized.";   \r
        }\r
-       \r
+\r
        virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {\r
                CASPAR_VERIFY(audio_cadence_.front() * frame->num_channels() == static_cast<size_t>(frame->audio_data().size()));\r
@@ -423,10 +437,11 @@ public:
                info.add(L"key-only", key_only_);\r
                info.add(L"device", device_index_);\r
                info.add(L"embedded-audio", embedded_audio_);\r
+               info.add(L"presentation-frame-age", presentation_frame_age_millis());\r
                return info;\r
        }\r
 \r
-       size_t buffer_depth() const override\r
+       virtual size_t buffer_depth() const override\r
        {\r
                return 1;\r
        }\r
@@ -435,6 +450,11 @@ public:
        {\r
                return 400 + device_index_;\r
        }\r
+\r
+       virtual int64_t presentation_frame_age_millis() const override\r
+       {\r
+               return consumer_ ? consumer_->presentation_delay_millis() : 0;\r
+       }\r
 };     \r
 \r
 safe_ptr<core::frame_consumer> create_consumer(const core::parameters& params)\r
index 55903c3704fbeb124ce4f0cb0bad2823c532be70..d99b1442c6e42b28979c95c9e9e543e7d6a25279 100644 (file)
@@ -186,6 +186,11 @@ public:
        {\r
                return frame_->audio_data();\r
        }\r
+\r
+       int64_t get_age_millis() const\r
+       {\r
+               return frame_->get_age_millis();\r
+       }\r
 };\r
 \r
 struct decklink_consumer : public IDeckLinkVideoOutputCallback, public IDeckLinkAudioOutputCallback, boost::noncopyable\r
@@ -223,6 +228,8 @@ struct decklink_consumer : public IDeckLinkVideoOutputCallback, public IDeckLink
        BMDReferenceStatus last_reference_status_;\r
        retry_task<bool> send_completion_;\r
 \r
+       tbb::atomic<int64_t>                            current_presentation_delay_;\r
+\r
 public:\r
        decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index) \r
                : channel_index_(channel_index)\r
@@ -242,6 +249,7 @@ public:
                , last_reference_status_(static_cast<BMDReferenceStatus>(-1))\r
        {\r
                is_running_ = true;\r
+               current_presentation_delay_ = 0;\r
                                \r
                video_frame_buffer_.set_capacity(1);\r
 \r
@@ -383,11 +391,13 @@ public:
                \r
                try\r
                {\r
+                       auto dframe = reinterpret_cast<decklink_frame*>(completed_frame);\r
+                       current_presentation_delay_ = dframe->get_age_millis();\r
+\r
                        if(result == bmdOutputFrameDisplayedLate)\r
                        {\r
                                graph_->set_tag("late-frame");\r
                                video_scheduled_ += format_desc_.duration;\r
-                               auto dframe = reinterpret_cast<decklink_frame*>(completed_frame);\r
                                audio_scheduled_ += dframe->audio_data().size()/config_.num_out_channels();\r
                                //++video_scheduled_;\r
                                //audio_scheduled_ += format_desc_.audio_cadence[0];\r
@@ -645,6 +655,7 @@ public:
                info.add(L"low-latency", config_.low_latency);\r
                info.add(L"embedded-audio", config_.embedded_audio);\r
                info.add(L"low-latency", config_.low_latency);\r
+               info.add(L"presentation-frame-age", presentation_frame_age_millis());\r
                //info.add(L"internal-key", config_.internal_key);\r
                return info;\r
        }\r
@@ -658,6 +669,11 @@ public:
        {\r
                return 300 + config_.device_index;\r
        }\r
+\r
+       virtual int64_t presentation_frame_age_millis() const\r
+       {\r
+               return context_ ? context_->current_presentation_delay_ : 0;\r
+       }\r
 };     \r
 \r
 safe_ptr<core::frame_consumer> create_consumer(const core::parameters& params) \r
index 4c6d3b9685dad6c532a9f40bf19bc865acc201a8..200f579a16aa650117f8d22b1662356aad9aa9dd 100644 (file)
       </Command>\r
     </PreBuildEvent>\r
     <ClCompile>\r
-      <Optimization>Disabled</Optimization>\r
-      <InlineFunctionExpansion>Disabled</InlineFunctionExpansion>\r
+      <Optimization>MaxSpeed</Optimization>\r
+      <InlineFunctionExpansion>AnySuitable</InlineFunctionExpansion>\r
       <IntrinsicFunctions>true</IntrinsicFunctions>\r
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>\r
       <AdditionalIncludeDirectories>../;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>\r
index 75094136a73b9cbb5ddf0e216694aca153ae467d..ffd1b6589fe392a46e7c155470e3e4638f135674 100644 (file)
@@ -46,6 +46,7 @@
 \r
 #include <tbb/cache_aligned_allocator.h>\r
 #include <tbb/parallel_invoke.h>\r
+#include <tbb/atomic.h>\r
 \r
 #include <boost/range/algorithm.hpp>\r
 #include <boost/range/algorithm_ext.hpp>\r
@@ -245,6 +246,7 @@ struct ffmpeg_consumer : boost::noncopyable
 \r
        output_format                                                   output_format_;\r
        bool                                                                    key_only_;\r
+       tbb::atomic<int64_t>                                    current_encoding_delay_;\r
        \r
 public:\r
        ffmpeg_consumer(const std::string& filename, const core::video_format_desc& format_desc, std::vector<option> options, bool key_only, const core::channel_layout& audio_channel_layout)\r
@@ -260,6 +262,8 @@ public:
                , output_format_(format_desc, filename, options)\r
                , key_only_(key_only)\r
        {\r
+               current_encoding_delay_ = 0;\r
+\r
                // TODO: Ask stakeholders about case where file already exists.\r
                boost::filesystem2::remove(boost::filesystem2::wpath(env::media_folder() + widen(filename))); // Delete the file if it exists\r
 \r
@@ -618,7 +622,8 @@ public:
                        if (!key_only_)\r
                                encode_audio_frame(*frame);\r
 \r
-                       graph_->set_value("frame-time", frame_timer.elapsed()*format_desc_.fps*0.5);                    \r
+                       graph_->set_value("frame-time", frame_timer.elapsed()*format_desc_.fps*0.5);\r
+                       current_encoding_delay_ = frame->get_age_millis();\r
                });\r
        }\r
 \r
@@ -659,6 +664,11 @@ public:
        {\r
                format_desc_ = format_desc;\r
        }\r
+\r
+       virtual int64_t presentation_frame_age_millis() const override\r
+       {\r
+               return consumer_ ? consumer_->current_encoding_delay_ : 0;\r
+       }\r
        \r
        virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {\r
index 87fcbbcc8f219a94f6955f08d4eeb20d94181b08..5421adbe2b128de42d264fdafaa81366284196bf 100644 (file)
@@ -275,14 +275,14 @@ public:
                ax_->Tick();\r
                if(ax_->InvalidRect())\r
                {\r
-                       fast_memclr(bmp_.data(), width_*height_*4);\r
-                       ax_->DrawControl(bmp_);\r
-               \r
                        core::pixel_format_desc desc;\r
                        desc.pix_fmt = core::pixel_format::bgra;\r
                        desc.planes.push_back(core::pixel_format_desc::plane(width_, height_, 4));\r
                        auto frame = frame_factory_->create_frame(this, desc);\r
 \r
+                       fast_memclr(bmp_.data(), width_*height_*4);\r
+                       ax_->DrawControl(bmp_);\r
+\r
                        if(frame->image_data().size() == static_cast<int>(width_*height_*4))\r
                        {\r
                                fast_memcpy(frame->image_data().begin(), bmp_.data(), width_*height_*4);\r
index b0e67a15e5d5a8a3ddf876ee64a75253cd562ced..42d45416f9d2d9054b817b61a94e15a0c8b9f647 100644 (file)
@@ -35,8 +35,6 @@
 #include <boost/date_time/posix_time/posix_time.hpp>\r
 #include <boost/thread.hpp>\r
 \r
-#include <tbb/concurrent_queue.h>\r
-\r
 #include <FreeImage.h>\r
 #include <vector>\r
 #include <algorithm>\r
@@ -79,6 +77,11 @@ public:
        {\r
                format_desc_ = format_desc;\r
        }\r
+\r
+       virtual int64_t presentation_frame_age_millis() const override\r
+       {\r
+               return 0;\r
+       }\r
        \r
        virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {                               \r
index 80dfbc2401828e5b83def55f7d1c4ae3874f0b0d..e91dd68a35eaa2b5721d3e88c68fa0fc514530c2 100644 (file)
@@ -106,7 +106,12 @@ public:
                }
                CASPAR_LOG(info) << print() << " Sucessfully Initialized.";
        }
-       
+
+       virtual int64_t presentation_frame_age_millis() const override
+       {
+               return 0;
+       }
+
        virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override
        {
                std::shared_ptr<audio_buffer_16> buffer;
index c30cbac5ed3a8a4e935ee65626263a033e30e610..b4f950f7cd95038083b360cc37a2ada0d9a48181 100644 (file)
@@ -143,6 +143,7 @@ struct ogl_consumer : boost::noncopyable
 \r
        boost::thread                   thread_;\r
        tbb::atomic<bool>               is_running_;\r
+       tbb::atomic<int64_t>    current_presentation_age_;\r
        \r
        ffmpeg::filter                  filter_;\r
 public:\r
@@ -197,6 +198,7 @@ public:
                screen_height_  = config_.windowed ? square_height_ : devmode.dmPelsHeight;\r
 \r
                is_running_ = true;\r
+               current_presentation_age_ = 0;\r
                thread_ = boost::thread([this]{run();});\r
        }\r
        \r
@@ -382,6 +384,8 @@ public:
 \r
                        wait_for_vblank_and_display(); // field2\r
                }\r
+\r
+               current_presentation_age_ = frame->get_age_millis();\r
        }\r
 \r
        void render(safe_ptr<AVFrame> av_frame, int image_data_size)\r
@@ -541,7 +545,12 @@ public:
                consumer_.reset(new ogl_consumer(config_, format_desc, channel_index));\r
                CASPAR_LOG(info) << print() << L" Successfully Initialized.";   \r
        }\r
-       \r
+\r
+       virtual int64_t presentation_frame_age_millis() const override\r
+       {\r
+               return consumer_ ? consumer_->current_presentation_age_ : 0;\r
+       }\r
+\r
        virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {\r
                return consumer_->send(frame);\r
index 0ffd85cdd771461945bd466c067dda0113d976f0..1896ce7a99493d146e3e79c058e4f9abdb3d2599 100644 (file)
@@ -1807,6 +1807,31 @@ bool InfoCommand::DoExecute()
                        \r
                        boost::property_tree::write_xml(replyString, info, w);\r
                }\r
+               else if(_parameters.size() >= 2 && _parameters[1] == L"DELAY")\r
+               {\r
+                       replyString << L"201 INFO DELAY OK\r\n";\r
+                       boost::property_tree::wptree info;\r
+\r
+                       std::vector<std::wstring> split;\r
+                       boost::split(split, _parameters[0], boost::is_any_of("-"));\r
+                                       \r
+                       int layer = std::numeric_limits<int>::min();\r
+                       int channel = boost::lexical_cast<int>(split[0]) - 1;\r
+\r
+                       if(split.size() > 1)\r
+                               layer = boost::lexical_cast<int>(split[1]);\r
+                               \r
+                       if(layer == std::numeric_limits<int>::min())\r
+                       {       \r
+                               info.add_child(L"channel-delay", channels_.at(channel)->delay_info());\r
+                       }\r
+                       else\r
+                       {\r
+                               info.add_child(L"layer-delay", channels_.at(channel)->stage()->delay_info(layer).get())\r
+                                       .add(L"index", layer);\r
+                       }\r
+                       boost::property_tree::xml_parser::write_xml(replyString, info, w);\r
+               }\r
                else // channel\r
                {                       \r
                        if(_parameters.size() >= 1)\r
index 221bc63934c6e036741a3b43fc5dc332fbf2303c..a84f3dd82ac9094ecde0947c746d0df2364fe76b 100644 (file)
                 <key-only>false [true|false]</key-only>\r
             </bluefish>\r
             <system-audio></system-audio>\r
+            <synchronizing>\r
+                ... consumer1\r
+                ... consumer2\r
+            </synchronizing>\r
             <screen>\r
                 <device>[0..]</device>\r
                 <aspect-ratio>default [default|4:3|16:9]</aspect-ratio>\r
index 0fa59241848031f6e36557ed816c08e17ab4280b..a781482b503c7dc992bcaceace1841d44222ae99 100644 (file)
@@ -35,6 +35,7 @@
 #include <core/video_channel.h>\r
 #include <core/producer/stage.h>\r
 #include <core/consumer/output.h>\r
+#include <core/consumer/synchronizing/synchronizing_consumer.h>\r
 #include <core/thumbnail_generator.h>\r
 \r
 #include <modules/bluefish/bluefish.h>\r
@@ -216,6 +217,10 @@ struct server::implementation : boost::noncopyable
                                        on_consumer(ffmpeg::create_consumer(xml_consumer.second));                                              \r
                                else if (name == L"system-audio")\r
                                        on_consumer(oal::create_consumer());\r
+                               else if (name == L"synchronizing")\r
+                                       on_consumer(make_safe<core::synchronizing_consumer>(\r
+                                                       create_consumers<core::frame_consumer>(\r
+                                                                       xml_consumer.second)));\r
                                else if (name != L"<xmlcomment>")\r
                                        CASPAR_LOG(warning) << "Invalid consumer: " << widen(name);     \r
                        }\r