]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 24 Oct 2011 12:28:43 +0000 (12:28 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 24 Oct 2011 12:28:43 +0000 (12:28 +0000)
16 files changed:
common/common.vcxproj
common/common.vcxproj.filters
common/concurrency/governor.h [new file with mode: 0644]
common/concurrency/message.h [deleted file]
core/consumer/output.cpp
core/consumer/output.h
core/mixer/mixer.cpp
core/mixer/mixer.h
core/producer/stage.cpp
core/producer/stage.h
core/video_channel.cpp
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/frame_muxer.h
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/util.h
modules/ffmpeg/producer/video/video_decoder.cpp

index 83a81b32812978cac1c31304ecdffd07a58080d6..807ff5f93294aa92492481f9d7da5a34db9cf105 100644 (file)
   <ItemGroup>\r
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
     <ClInclude Include="concurrency\executor.h" />\r
-    <ClInclude Include="concurrency\message.h" />\r
+    <ClInclude Include="concurrency\governor.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
     <ClInclude Include="exception\exceptions.h" />\r
     <ClInclude Include="exception\win32_exception.h" />\r
index ef626f0123b5f99ab987bceaeee22524d700c3fc..43c4f0eef2a11c5af7d4a65676bf0f3eadf3b1aa 100644 (file)
     <ClInclude Include="utility\move_on_copy.h">\r
       <Filter>source\utility</Filter>\r
     </ClInclude>\r
-    <ClInclude Include="concurrency\message.h">\r
+    <ClInclude Include="concurrency\governor.h">\r
       <Filter>source\concurrency</Filter>\r
     </ClInclude>\r
   </ItemGroup>\r
diff --git a/common/concurrency/governor.h b/common/concurrency/governor.h
new file mode 100644 (file)
index 0000000..30485a2
--- /dev/null
@@ -0,0 +1,132 @@
+#pragma once\r
+\r
+#include "../memory/safe_ptr.h"\r
+\r
+#include <concrt.h>\r
+#include <concurrent_queue.h>\r
+#include <tbb/atomic.h>\r
+\r
+#include <boost/noncopyable.hpp>\r
+#include <boost/any.hpp>\r
+\r
+namespace caspar {\r
+       \r
+//namespace Concurrency\r
+//{    \r
+//     class semaphore\r
+//     {\r
+//     public:\r
+//             explicit semaphore(LONG capacity)\r
+//                       : _semaphore_count(capacity)\r
+//             {\r
+//             }\r
+//\r
+//             // Acquires access to the semaphore.\r
+//             void acquire()\r
+//             {\r
+//                     // The capacity of the semaphore is exceeded when the semaphore count \r
+//                     // falls below zero. When this happens, add the current context to the \r
+//                     // back of the wait queue and block the current context.\r
+//                     if (InterlockedDecrement(&_semaphore_count) < 0)\r
+//                     {\r
+//                             _waiting_contexts.push(Concurrency::Context::CurrentContext());\r
+//                             Concurrency::Context::Block();\r
+//                     }\r
+//             }\r
+//\r
+//             // Releases access to the semaphore.\r
+//             void release()\r
+//             {\r
+//               // If the semaphore count is negative, unblock the first waiting context.\r
+//                     if (InterlockedIncrement(&_semaphore_count) <= 0)\r
+//                     {\r
+//                      // A call to acquire might have decremented the counter, but has not\r
+//                      // yet finished adding the context to the queue. \r
+//                      // Create a spin loop that waits for the context to become available.\r
+//                             Concurrency:: Context* waiting = NULL;\r
+//                             while(!_waiting_contexts.try_pop(waiting))\r
+//                             {\r
+//                                     Concurrency::wait(0);\r
+//                             }\r
+//\r
+//                             // Unblock the context.\r
+//                              waiting->Unblock();\r
+//                     }\r
+//             }\r
+//\r
+//             void release_all()\r
+//             {\r
+//                     Concurrency:: Context* waiting = NULL;\r
+//                     while(_waiting_contexts.try_pop(waiting))\r
+//                     {\r
+//                             InterlockedIncrement(&_semaphore_count);\r
+//                             waiting->Unblock();\r
+//                     }\r
+//             }\r
+//\r
+//     private:\r
+//             // The semaphore count.\r
+//             LONG _semaphore_count;\r
+//\r
+//             // A concurrency-safe queue of contexts that must wait to \r
+//             // acquire the semaphore.\r
+//             Concurrency::concurrent_queue<Concurrency::Context*> _waiting_contexts;\r
+//        \r
+//             semaphore const &operator =(semaphore const&);  // no assignment operator\r
+//             semaphore(semaphore const &);                   // no copy constructor\r
+//     };\r
+//}\r
+       \r
+typedef safe_ptr<int> ticket_t;\r
+\r
+class governor : boost::noncopyable\r
+{\r
+       tbb::atomic<int> count_;\r
+       Concurrency::concurrent_queue<Concurrency::Context*> waiting_contexts_;\r
+\r
+       void acquire_ticket(Concurrency::Context* context)\r
+       {\r
+               if (--count_ < 0)\r
+               {\r
+                       waiting_contexts_.push(context);\r
+                       context->Block();\r
+               }\r
+       }\r
+\r
+       void release_ticket()\r
+       {\r
+               if(++count_ <= 0)\r
+               {\r
+                       Concurrency:: Context* waiting = NULL;\r
+                       while(!waiting_contexts_.try_pop(waiting))\r
+                               Concurrency::wait(0);\r
+                       waiting->Unblock();\r
+               }\r
+       }\r
+\r
+public:\r
+\r
+       governor(size_t capacity) \r
+       {\r
+               count_ = capacity;\r
+       }\r
+       \r
+       ticket_t acquire()\r
+       {\r
+               acquire_ticket(Concurrency::Context::CurrentContext());\r
+               \r
+               return safe_ptr<int>(new int, [this](int* p)\r
+               {\r
+                       delete p;\r
+                       release_ticket();\r
+               });\r
+       }\r
+\r
+       void cancel()\r
+       {\r
+               while(count_ <= 0)\r
+                       release_ticket();\r
+       }\r
+};\r
+\r
+}
\ No newline at end of file
diff --git a/common/concurrency/message.h b/common/concurrency/message.h
deleted file mode 100644 (file)
index 8133b2e..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-#pragma once\r
-\r
-#include "../memory/safe_ptr.h"\r
-\r
-#include <agents.h>\r
-#include <semaphore.h>\r
-\r
-#include <boost/noncopyable.hpp>\r
-\r
-namespace caspar {\r
-       \r
-class token : boost::noncopyable\r
-{\r
-       std::shared_ptr<Concurrency::semaphore> semaphore_;\r
-public:\r
-\r
-       token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
-               : semaphore_(semaphore)\r
-       {\r
-               semaphore_->acquire();\r
-       }\r
-\r
-       token(token&& other)\r
-               : semaphore_(std::move(other.semaphore_))\r
-       {\r
-       }\r
-\r
-       ~token()\r
-       {\r
-               if(semaphore_)\r
-                       semaphore_->release();\r
-       }\r
-};\r
-\r
-template<typename T>\r
-class message\r
-{\r
-       T payload_;\r
-       token token_;\r
-public:\r
-       message(const T& payload, token&& token)\r
-               : payload_(payload)\r
-               , token_(std::move(token))\r
-       {\r
-       }\r
-       \r
-       template<typename U>\r
-       safe_ptr<message<U>> transfer(const U& payload)\r
-       {\r
-               return make_safe<message<U>>(payload, std::move(token_));\r
-       }\r
-\r
-       T& value() {return payload_;}\r
-       const T& value() const {return payload_;}\r
-};\r
-\r
-}
\ No newline at end of file
index 99dcb0c35206ca9499a982c55b6000c51cbd9bbb..a33459621e8311e65355ea5998c304fe67ff24b6 100644 (file)
@@ -52,8 +52,8 @@ struct output::implementation
        \r
        high_prec_timer timer_;\r
 \r
-       critical_section                                                                mutex_;\r
-       call<safe_ptr<message<safe_ptr<read_frame>>>>   output_;\r
+       critical_section                                mutex_;\r
+       call<output::source_element_t>  output_;\r
                \r
 public:\r
        implementation(output::source_t& source, const video_format_desc& format_desc) \r
@@ -93,9 +93,9 @@ public:
                }\r
        }\r
                                                \r
-       void execute(const safe_ptr<message<safe_ptr<read_frame>>>& msg)\r
+       void execute(const output::source_element_t& element)\r
        {       \r
-               auto frame = msg->value();\r
+               auto frame = element->first;\r
 \r
                {\r
                        critical_section::scoped_lock lock(mutex_);             \r
index a05407ad291d882291ee8480fbc2f5d545e7f2ea..e719595b792be8eb67643d4a019626762bdf20a3 100644 (file)
@@ -21,7 +21,7 @@
 \r
 #include "../consumer/frame_consumer.h"\r
 \r
-#include <common/concurrency/message.h>\r
+#include <common/concurrency/governor.h>\r
 #include <common/memory/safe_ptr.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
@@ -35,7 +35,8 @@ class video_channel_context;
 class output : boost::noncopyable\r
 {\r
 public:\r
-       typedef Concurrency::ISource<safe_ptr<message<safe_ptr<read_frame>>>> source_t;\r
+       typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>>       source_element_t;\r
+       typedef Concurrency::ISource<source_element_t>                                          source_t;\r
 \r
        explicit output(source_t& source, const video_format_desc& format_desc);\r
 \r
index 2df61eb4745e8b23d0ee98d3cfc0fe1eeef76777..f4d6dbd022d3e871d62dd6904c8ccbde31b4c286 100644 (file)
@@ -96,8 +96,7 @@ struct mixer::implementation : boost::noncopyable
        std::unordered_map<int, blend_mode::type> blend_modes_;\r
                \r
        critical_section                        mutex_;\r
-       Concurrency::transformer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>, \r
-                                                        safe_ptr<message<safe_ptr<core::read_frame>>>> mixer_;\r
+       Concurrency::transformer<mixer::source_element_t, mixer::target_element_t> mixer_;\r
 public:\r
        implementation(mixer::source_t& source, mixer::target_t& target, const video_format_desc& format_desc, ogl_device& ogl) \r
                : format_desc_(format_desc)\r
@@ -110,9 +109,9 @@ public:
                source.link_target(&mixer_);\r
        }\r
                \r
-       safe_ptr<message<safe_ptr<core::read_frame>>> mix(const safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>& msg)\r
+       mixer::target_element_t mix(const mixer::source_element_t& element)\r
        {               \r
-               auto frames = msg->value();\r
+               auto frames = element->first;\r
 \r
                auto frame = make_safe<read_frame>();\r
 \r
@@ -159,7 +158,7 @@ public:
                        Concurrency::wait(20);\r
                }\r
 \r
-               return msg->transfer<safe_ptr<core::read_frame>>(std::move(frame));     \r
+               return mixer::target_element_t(std::make_pair(std::move(frame), element->second));      \r
        }\r
                                                \r
        boost::unique_future<safe_ptr<core::write_frame>> async_create_frame(const void* tag, const core::pixel_format_desc& desc)\r
index f2a5f14ff7c133649fb401739e167a8657d5efd1..6193b69d6a6c6b519083710bd3997f9aa7620894 100644 (file)
@@ -23,7 +23,7 @@
 \r
 #include "../producer/frame/frame_factory.h"\r
 \r
-#include <common/concurrency/message.h>\r
+#include <common/concurrency/governor.h>\r
 #include <common/memory/safe_ptr.h>\r
 \r
 #include <agents.h>\r
@@ -49,8 +49,11 @@ class mixer : public core::frame_factory
 {\r
 public:        \r
        \r
-       typedef Concurrency::ISource<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>>   source_t;\r
-       typedef Concurrency::ITarget<safe_ptr<message<safe_ptr<core::read_frame>>>>                             target_t;\r
+       typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> source_element_t;\r
+       typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>>                       target_element_t;\r
+\r
+       typedef Concurrency::ISource<source_element_t>                                                          source_t;\r
+       typedef Concurrency::ITarget<target_element_t>                                                          target_t;\r
 \r
        explicit mixer(source_t& source, target_t& target, const video_format_desc& format_desc, ogl_device& ogl);\r
                                                \r
index 7193787befd5afcdcf49b1e3f95ca0ea72e62710..f9113d4ff965b9175dab8d5193513a382119933e 100644 (file)
@@ -48,11 +48,11 @@ struct stage::implementation : public agent, public boost::noncopyable
        Concurrency::critical_section                           mutex_;\r
        stage::target_t&                                                        target_;\r
        std::map<int, layer>                                            layers_;        \r
-       safe_ptr<semaphore>                                                     semaphore_;\r
+       governor&                                                                       governor_;\r
 public:\r
-       implementation(stage::target_t& target, const safe_ptr<semaphore>& semaphore)  \r
+       implementation(stage::target_t& target, governor& governor)  \r
                : target_(target)\r
-               , semaphore_(semaphore)\r
+               , governor_(governor)\r
        {\r
                start();\r
        }\r
@@ -60,7 +60,7 @@ public:
        ~implementation()\r
        {\r
                send(is_running_, false);\r
-               semaphore_->release();\r
+               governor_.cancel();\r
                agent::wait(this);\r
        }\r
 \r
@@ -84,7 +84,7 @@ public:
                                        });\r
                                }\r
 \r
-                               send(target_, make_safe<message<decltype(frames)>>(frames, token(semaphore_)));\r
+                               send(target_, stage::target_element_t(std::make_pair(frames, governor_.acquire())));\r
                        }\r
                        catch(...)\r
                        {\r
@@ -223,7 +223,7 @@ public:
 \r
 };\r
 \r
-stage::stage(target_t& target, const safe_ptr<semaphore>& semaphore) : impl_(new implementation(target, semaphore)){}\r
+stage::stage(target_t& target, governor& governor) : impl_(new implementation(target, governor)){}\r
 void stage::swap(stage& other){impl_->swap(other);}\r
 void stage::load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta){impl_->load(index, producer, preview, auto_play_delta);}\r
 void stage::pause(int index){impl_->pause(index);}\r
index bcfd149cc405c037e9187b31ca53f5356d0fba81..3a925e80982747cf6f727d36da39b4b6cdebf944 100644 (file)
 #include "frame_producer.h"\r
 \r
 #include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/message.h>\r
+#include <common/concurrency/governor.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
 \r
 #include <agents.h>\r
-#include <semaphore.h>\r
 \r
 namespace caspar { namespace core {\r
 \r
@@ -39,9 +38,11 @@ struct layer_status;
 class stage : boost::noncopyable\r
 {\r
 public:\r
-       typedef Concurrency::ITarget<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> target_t;\r
+       \r
+       typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> target_element_t;\r
+       typedef Concurrency::ITarget<target_element_t> target_t;\r
 \r
-       explicit stage(target_t& target, const safe_ptr<Concurrency::semaphore>& semaphore);\r
+       explicit stage(target_t& target, governor& governor);\r
 \r
        void swap(stage& other);\r
                        \r
index 77057a7ab311c230eb089041a636a8bb6cdeda7c..7e1977a297934957338255e507c47f34d8b4ec8f 100644 (file)
 #include "mixer/mixer.h"\r
 #include "producer/stage.h"\r
 \r
-#include <common/concurrency/executor.h>\r
+#include <common/concurrency/governor.h>\r
 #include <common/diagnostics/graph.h>\r
 \r
 #include "mixer/gpu/ogl_device.h"\r
 \r
 #include <agents_extras.h>\r
-#include <semaphore.h>\r
 \r
 #include <boost/timer.hpp>\r
 \r
@@ -48,12 +47,12 @@ namespace caspar { namespace core {
 \r
 struct video_channel::implementation : boost::noncopyable\r
 {\r
-       unbounded_buffer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>>       stage_frames_;\r
-       unbounded_buffer<safe_ptr<message<safe_ptr<read_frame>>>>                                       mixer_frames_;\r
+       unbounded_buffer<safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>>>   stage_frames_;\r
+       unbounded_buffer<safe_ptr<std::pair<safe_ptr<read_frame>, ticket_t>>>                                   mixer_frames_;\r
        \r
        const video_format_desc                         format_desc_;\r
        \r
-       safe_ptr<semaphore>                                     semaphore_;\r
+       governor                                                        governor_;\r
        safe_ptr<caspar::core::output>          output_;\r
        safe_ptr<caspar::core::mixer>           mixer_;\r
        safe_ptr<caspar::core::stage>           stage_;\r
@@ -67,10 +66,10 @@ public:
        implementation(int index, const video_format_desc& format_desc, ogl_device& ogl)  \r
                : graph_(diagnostics::create_graph(narrow(print()), false))\r
                , format_desc_(format_desc)\r
-               , semaphore_(make_safe<semaphore>(3))\r
+               , governor_(3)\r
                , output_(new caspar::core::output(mixer_frames_, format_desc))\r
                , mixer_(new caspar::core::mixer(stage_frames_, mixer_frames_, format_desc, ogl))\r
-               , stage_(new caspar::core::stage(stage_frames_, semaphore_))    \r
+               , stage_(new caspar::core::stage(stage_frames_, governor_))     \r
        {\r
                graph_->add_guide("produce-time", 0.5f);        \r
                graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
index c2335bf4f6850281a4096bbee759968a91858992..1afdb7f34157aa1e3823b65fe24b1757320f6a12 100644 (file)
@@ -42,9 +42,6 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <connect.h>\r
-#include <semaphore.h>\r
-\r
 using namespace Concurrency;\r
 \r
 namespace caspar { namespace ffmpeg {\r
@@ -77,7 +74,7 @@ public:
        {                               \r
                CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
 \r
-               Concurrency::connect(source, source_);\r
+               source.link_target(&source_);\r
 \r
                start();\r
        }\r
index 1c2a17adfd502d38a26094e70444dda5680c9e48..46ed171aba1710e729d127993f26bc01f997e815 100644 (file)
@@ -9,7 +9,6 @@
 #include <boost/noncopyable.hpp>\r
 \r
 #include <agents.h>\r
-#include <semaphore.h>\r
 \r
 #include <vector>\r
 \r
index 740748ea12594e101fa2b4290bd2e5a7bb531ced..534abfc548c41d549cf1678c48107c0c174f4774 100644 (file)
@@ -37,7 +37,6 @@
 \r
 #include <agents.h>\r
 #include <concrt_extras.h>\r
-#include <semaphore.h>\r
 \r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
index 976ffefcf47ca064d8519fe8dacc9528635195c9..13a419db4b22d42d9cd1ebcfa50bfeef4ac5a159 100644 (file)
@@ -20,9 +20,6 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <agents.h>\r
-#include <semaphore.h>\r
-\r
 struct AVFrame;\r
 struct AVFormatContext;\r
 struct AVPacket;\r
index 25cd6f3fcad94b114dc86317949d8bd9d7a8d5b9..9401421aacd49fa185852c52c8a78c04e51aa775 100644 (file)
@@ -42,9 +42,6 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <connect.h>\r
-#include <semaphore.h>\r
-\r
 #include <tbb/scalable_allocator.h>\r
 \r
 using namespace Concurrency;\r
@@ -86,7 +83,7 @@ public:
                CASPAR_VERIFY(width_ > 0, ffmpeg_error());\r
                CASPAR_VERIFY(height_ > 0, ffmpeg_error());\r
 \r
-               Concurrency::connect(source, source_);\r
+               source.link_target(&source_);\r
 \r
                start();\r
        }\r