]> git.sesse.net Git - casparcg/commitdiff
2.0.2: core: Simplified pipeline token handling. Stage no longer blocks while waiting...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sun, 27 Nov 2011 17:11:31 +0000 (17:11 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sun, 27 Nov 2011 17:11:31 +0000 (17:11 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0.2@1681 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

common/common.vcxproj
common/common.vcxproj.filters
common/concurrency/governor.h [deleted file]
core/consumer/output.cpp
core/consumer/output.h
core/mixer/mixer.cpp
core/mixer/mixer.h
core/producer/stage.cpp
core/producer/stage.h
core/video_channel.cpp

index 09543600bc06446a05a46fb8b3e66f6b5f9130d2..311391eef9e55cce43ce6e0e7fb47729b50da3b9 100644 (file)
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
     <ClInclude Include="concurrency\com_context.h" />\r
     <ClInclude Include="concurrency\executor.h" />\r
-    <ClInclude Include="concurrency\governor.h" />\r
     <ClInclude Include="concurrency\target.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
     <ClInclude Include="exception\exceptions.h" />\r
index 6b88e2b7d4126bfd2697d026de540c9413b421c3..9986cc11a205c5deba1168fda5416a822f9a3a0a 100644 (file)
     <ClInclude Include="concurrency\target.h">\r
       <Filter>source\concurrency</Filter>\r
     </ClInclude>\r
-    <ClInclude Include="concurrency\governor.h">\r
-      <Filter>source\concurrency</Filter>\r
-    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
diff --git a/common/concurrency/governor.h b/common/concurrency/governor.h
deleted file mode 100644 (file)
index c2dabc1..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-#pragma once\r
-\r
-#include <memory>\r
-\r
-#include <boost/thread/condition_variable.hpp>\r
-#include <boost/thread/mutex.hpp>\r
-\r
-#include <tbb/atomic.h>\r
-\r
-namespace caspar {\r
-\r
-typedef std::shared_ptr<void> ticket;\r
-\r
-namespace detail\r
-{      \r
-       class governor_impl : public std::enable_shared_from_this<governor_impl>\r
-       {\r
-               boost::mutex                            mutex_;\r
-               boost::condition_variable       cond_;\r
-               tbb::atomic<int>                        count_;\r
-       public:\r
-               governor_impl(size_t count) \r
-               {\r
-                       count_ = count;\r
-               }\r
-\r
-               ticket acquire()\r
-               {\r
-                       {\r
-                               boost::unique_lock<boost::mutex> lock(mutex_);\r
-                               while(count_ < 0)\r
-                                       cond_.wait(lock);\r
-                               --count_;\r
-                       }\r
-\r
-                       auto self = shared_from_this();\r
-                       return ticket(nullptr, [self](void*)\r
-                       {\r
-                               ++self->count_;\r
-                               self->cond_.notify_one();\r
-                       });\r
-               }\r
-\r
-               void cancel()\r
-               {\r
-                       count_ = std::numeric_limits<int>::max();                       \r
-                       cond_.notify_all();\r
-               }\r
-       };\r
-}\r
-\r
-class governor\r
-{\r
-       std::shared_ptr<detail::governor_impl> impl_;\r
-public:\r
-\r
-       governor(size_t count) \r
-               : impl_(new detail::governor_impl(count))\r
-       {\r
-       }\r
-\r
-       ticket acquire()\r
-       {\r
-               return impl_->acquire();\r
-       }\r
-\r
-       void cancel()\r
-       {\r
-               impl_->cancel();\r
-       }\r
-\r
-};\r
-\r
-}
\ No newline at end of file
index 2b488cf1667307fa62b8357e79ebe02263d9b218..4da6f05bdc65121affd0388b42c21ce4458b58a6 100644 (file)
@@ -142,7 +142,7 @@ public:
                return std::make_pair(min, max);\r
        }\r
 \r
-       void send(const std::pair<safe_ptr<read_frame>, ticket>& packet)\r
+       void send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& packet)\r
        {\r
                executor_.begin_invoke([=]\r
                {\r
@@ -230,6 +230,6 @@ private:
 output::output(const safe_ptr<diagnostics::graph>& graph, const video_format_desc& format_desc, int channel_index) : impl_(new implementation(graph, format_desc, channel_index)){}\r
 void output::add(int index, safe_ptr<frame_consumer>&& consumer){impl_->add(index, std::move(consumer));}\r
 void output::remove(int index){impl_->remove(index);}\r
-void output::send(const std::pair<safe_ptr<read_frame>, ticket>& frame) {impl_->send(frame); }\r
+void output::send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& frame) {impl_->send(frame); }\r
 void output::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
 }}
\ No newline at end of file
index c009f4f812f05f489d97788d1ad599e9d5e268da..2305a974a2b15cc9dc6635779d225ef7835f9d19 100644 (file)
 \r
 #include <common/memory/safe_ptr.h>\r
 #include <common/concurrency/target.h>\r
-#include <common/concurrency/governor.h>\r
 #include <common/diagnostics/graph.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
 namespace caspar { namespace core {\r
        \r
-class output : public target<std::pair<safe_ptr<read_frame>, ticket>>, boost::noncopyable\r
+class output : public target<std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>>, boost::noncopyable\r
 {\r
 public:\r
        explicit output(const safe_ptr<diagnostics::graph>& graph, const video_format_desc& format_desc, int channel_index);\r
 \r
        // target\r
        \r
-       virtual void send(const std::pair<safe_ptr<read_frame>, ticket>& frame) override;\r
+       virtual void send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& frame) override;\r
 \r
        // output\r
 \r
index 44748fcb9589cf25b283d63f5f47f7f86a7e2aa5..beb1665b64128ac8c7c0568026cb94ccf95e9ac8 100644 (file)
@@ -114,7 +114,7 @@ public:
                graph_->set_color("mix-time", diagnostics::color(1.0f, 0.0f, 0.9f));\r
        }\r
        \r
-       void send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, ticket>& packet)\r
+       void send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, std::shared_ptr<void>>& packet)\r
        {                       \r
                executor_.begin_invoke([=]\r
                {               \r
@@ -238,7 +238,7 @@ public:
        \r
 mixer::mixer(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc, const safe_ptr<ogl_device>& ogl) \r
        : impl_(new implementation(graph, target, format_desc, ogl)){}\r
-void mixer::send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, ticket>& frames){ impl_->send(frames);}\r
+void mixer::send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, std::shared_ptr<void>>& frames){ impl_->send(frames);}\r
 core::video_format_desc mixer::get_video_format_desc() const { return impl_->get_video_format_desc(); }\r
 safe_ptr<core::write_frame> mixer::create_frame(const void* tag, const core::pixel_format_desc& desc){ return impl_->create_frame(tag, desc); }                \r
 safe_ptr<core::write_frame> mixer::create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt)\r
index 1a15466d9c20911a4bdde9f5c3fc2e1379e44fb2..4b52ac8199983429f22f1b390ae64467165142da 100644 (file)
@@ -25,7 +25,6 @@
 \r
 #include <common/memory/safe_ptr.h>\r
 #include <common/concurrency/target.h>\r
-#include <common/concurrency/governor.h>\r
 #include <common/diagnostics/graph.h>\r
 \r
 #include <map>\r
@@ -43,16 +42,16 @@ class ogl_device;
 struct frame_transform;\r
 struct pixel_format;\r
 \r
-class mixer : public target<std::pair<std::map<int, safe_ptr<core::basic_frame>>, ticket>>, public core::frame_factory\r
+class mixer : public target<std::pair<std::map<int, safe_ptr<core::basic_frame>>, std::shared_ptr<void>>>, public core::frame_factory\r
 {\r
 public:        \r
-       typedef target<std::pair<safe_ptr<read_frame>, ticket>> target_t;\r
+       typedef target<std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>> target_t;\r
 \r
        explicit mixer(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc, const safe_ptr<ogl_device>& ogl);\r
                \r
        // target\r
 \r
-       virtual void send(const std::pair<std::map<int, safe_ptr<basic_frame>>, ticket>& frames) override; \r
+       virtual void send(const std::pair<std::map<int, safe_ptr<basic_frame>>, std::shared_ptr<void>>& frames) override; \r
                \r
        // mixer\r
 \r
index adcca8b064cc5d141926ef2a078af8a669e3e11d..d52d98379991a260e55bdb5e59b500dfc8a3c3aa 100644 (file)
@@ -28,8 +28,6 @@
 #include "frame/frame_factory.h"\r
 \r
 #include <common/concurrency/executor.h>\r
-#include <common/concurrency/governor.h>\r
-#include <common/env.h>\r
 \r
 #include <boost/foreach.hpp>\r
 #include <boost/timer.hpp>\r
@@ -40,7 +38,8 @@
 \r
 namespace caspar { namespace core {\r
 \r
-struct stage::implementation : boost::noncopyable\r
+struct stage::implementation : public std::enable_shared_from_this<implementation>\r
+                                                        , boost::noncopyable\r
 {              \r
        safe_ptr<diagnostics::graph>    graph_;\r
        safe_ptr<stage::target_t>               target_;\r
@@ -51,7 +50,6 @@ struct stage::implementation : boost::noncopyable
 \r
        std::map<int, layer>                    layers_;        \r
 \r
-       governor                                                governor_;\r
        executor                                                executor_;\r
 public:\r
        implementation(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<stage::target_t>& target, const video_format_desc& format_desc)  \r
@@ -59,26 +57,22 @@ public:
                , format_desc_(format_desc)\r
                , target_(target)\r
                , executor_(L"stage")\r
-               , governor_(std::max(1, env::properties().get("configuration.pipeline-tokens", 2)))\r
        {\r
                graph_->add_guide("tick-time", 0.5f);   \r
                graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));   \r
                graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
-\r
-               executor_.begin_invoke([this]{tick();});\r
        }\r
 \r
-       ~implementation()\r
+       void spawn_token()\r
        {\r
-               governor_.cancel();\r
+               std::weak_ptr<implementation> self = shared_from_this();\r
+               executor_.begin_invoke([=]{tick(self);});\r
        }\r
-                                               \r
-       void tick()\r
+                                                       \r
+       void tick(const std::weak_ptr<implementation>& self)\r
        {               \r
                try\r
                {\r
-                       auto ticket = governor_.acquire();\r
-\r
                        produce_timer_.restart();\r
 \r
                        std::map<int, safe_ptr<basic_frame>> frames;\r
@@ -93,6 +87,13 @@ public:
                        \r
                        graph_->update_value("produce-time", produce_timer_.elapsed()*format_desc_.fps*0.5);\r
                        \r
+                       std::shared_ptr<void> ticket(nullptr, [self](void*)\r
+                       {\r
+                               auto self2 = self.lock();\r
+                               if(self2)                               \r
+                                       self2->executor_.begin_invoke([=]{tick(self);});                                \r
+                       });\r
+\r
                        target_->send(std::make_pair(frames, ticket));\r
 \r
                        graph_->update_value("tick-time", tick_timer_.elapsed()*format_desc_.fps*0.5);\r
@@ -103,7 +104,6 @@ public:
                        layers_.clear();\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                }               \r
-               executor_.begin_invoke([this]{tick();});\r
        }\r
 \r
        void load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta)\r
@@ -224,6 +224,7 @@ public:
 };\r
 \r
 stage::stage(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc) : impl_(new implementation(graph, target, format_desc)){}\r
+void stage::spawn_token(){impl_->spawn_token();}\r
 void stage::swap(stage& other){impl_->swap(other);}\r
 void stage::load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta){impl_->load(index, producer, preview, auto_play_delta);}\r
 void stage::pause(int index){impl_->pause(index);}\r
index 175129f8193908d05a96633c5053302c144d8685..13e9d046ff019b959fa115f8402507e6bd532727 100644 (file)
@@ -24,7 +24,6 @@
 \r
 #include <common/memory/safe_ptr.h>\r
 #include <common/concurrency/target.h>\r
-#include <common/concurrency/governor.h>\r
 #include <common/diagnostics/graph.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
@@ -37,12 +36,14 @@ struct layer_status;
 class stage : boost::noncopyable\r
 {\r
 public:\r
-       typedef target<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket>> target_t;\r
+       typedef target<std::pair<std::map<int, safe_ptr<basic_frame>>, std::shared_ptr<void>>> target_t;\r
 \r
        explicit stage(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc);\r
        \r
        // stage\r
 \r
+       void spawn_token();\r
+\r
        void swap(stage& other);\r
                        \r
        void load(int index, const safe_ptr<frame_producer>& producer, bool preview = false, int auto_play_delta = -1);\r
index d5d2ecd81d8343f6bba41f40832e7491d7ce45ad..67eebfde816cfdef6da6b4031ec5cda55d7f0d9f 100644 (file)
@@ -30,6 +30,7 @@
 #include "producer/stage.h"\r
 \r
 #include <common/diagnostics/graph.h>\r
+#include <common/env.h>\r
 \r
 #include <string>\r
 \r
@@ -58,6 +59,9 @@ public:
                graph_->set_text(print());\r
                diagnostics::register_graph(graph_);\r
 \r
+               for(int n = 0; n < std::max(1, env::properties().get("configuration.pipeline-tokens", 2)); ++n)\r
+                       stage_->spawn_token();\r
+\r
                CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
        }\r
        \r