<ItemGroup>\r
<ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
<ClInclude Include="concurrency\executor.h" />\r
- <ClInclude Include="concurrency\message.h" />\r
+ <ClInclude Include="concurrency\governor.h" />\r
<ClInclude Include="diagnostics\graph.h" />\r
<ClInclude Include="exception\exceptions.h" />\r
<ClInclude Include="exception\win32_exception.h" />\r
<ClInclude Include="utility\move_on_copy.h">\r
<Filter>source\utility</Filter>\r
</ClInclude>\r
- <ClInclude Include="concurrency\message.h">\r
+ <ClInclude Include="concurrency\governor.h">\r
<Filter>source\concurrency</Filter>\r
</ClInclude>\r
</ItemGroup>\r
--- /dev/null
+#pragma once\r
+\r
+#include "../memory/safe_ptr.h"\r
+\r
+#include <concrt.h>\r
+#include <concurrent_queue.h>\r
+#include <tbb/atomic.h>\r
+\r
+#include <boost/noncopyable.hpp>\r
+#include <boost/any.hpp>\r
+\r
+namespace caspar {\r
+ \r
+//namespace Concurrency\r
+//{ \r
+// class semaphore\r
+// {\r
+// public:\r
+// explicit semaphore(LONG capacity)\r
+// : _semaphore_count(capacity)\r
+// {\r
+// }\r
+//\r
+// // Acquires access to the semaphore.\r
+// void acquire()\r
+// {\r
+// // The capacity of the semaphore is exceeded when the semaphore count \r
+// // falls below zero. When this happens, add the current context to the \r
+// // back of the wait queue and block the current context.\r
+// if (InterlockedDecrement(&_semaphore_count) < 0)\r
+// {\r
+// _waiting_contexts.push(Concurrency::Context::CurrentContext());\r
+// Concurrency::Context::Block();\r
+// }\r
+// }\r
+//\r
+// // Releases access to the semaphore.\r
+// void release()\r
+// {\r
+// // If the semaphore count is negative, unblock the first waiting context.\r
+// if (InterlockedIncrement(&_semaphore_count) <= 0)\r
+// {\r
+// // A call to acquire might have decremented the counter, but has not\r
+// // yet finished adding the context to the queue. \r
+// // Create a spin loop that waits for the context to become available.\r
+// Concurrency:: Context* waiting = NULL;\r
+// while(!_waiting_contexts.try_pop(waiting))\r
+// {\r
+// Concurrency::wait(0);\r
+// }\r
+//\r
+// // Unblock the context.\r
+// waiting->Unblock();\r
+// }\r
+// }\r
+//\r
+// void release_all()\r
+// {\r
+// Concurrency:: Context* waiting = NULL;\r
+// while(_waiting_contexts.try_pop(waiting))\r
+// {\r
+// InterlockedIncrement(&_semaphore_count);\r
+// waiting->Unblock();\r
+// }\r
+// }\r
+//\r
+// private:\r
+// // The semaphore count.\r
+// LONG _semaphore_count;\r
+//\r
+// // A concurrency-safe queue of contexts that must wait to \r
+// // acquire the semaphore.\r
+// Concurrency::concurrent_queue<Concurrency::Context*> _waiting_contexts;\r
+// \r
+// semaphore const &operator =(semaphore const&); // no assignment operator\r
+// semaphore(semaphore const &); // no copy constructor\r
+// };\r
+//}\r
+ \r
+typedef safe_ptr<int> ticket_t;\r
+\r
+class governor : boost::noncopyable\r
+{\r
+ tbb::atomic<int> count_;\r
+ Concurrency::concurrent_queue<Concurrency::Context*> waiting_contexts_;\r
+\r
+ void acquire_ticket(Concurrency::Context* context)\r
+ {\r
+ if (--count_ < 0)\r
+ {\r
+ waiting_contexts_.push(context);\r
+ context->Block();\r
+ }\r
+ }\r
+\r
+ void release_ticket()\r
+ {\r
+ if(++count_ <= 0)\r
+ {\r
+ Concurrency:: Context* waiting = NULL;\r
+ while(!waiting_contexts_.try_pop(waiting))\r
+ Concurrency::wait(0);\r
+ waiting->Unblock();\r
+ }\r
+ }\r
+\r
+public:\r
+\r
+ governor(size_t capacity) \r
+ {\r
+ count_ = capacity;\r
+ }\r
+ \r
+ ticket_t acquire()\r
+ {\r
+ acquire_ticket(Concurrency::Context::CurrentContext());\r
+ \r
+ return safe_ptr<int>(new int, [this](int* p)\r
+ {\r
+ delete p;\r
+ release_ticket();\r
+ });\r
+ }\r
+\r
+ void cancel()\r
+ {\r
+ while(count_ <= 0)\r
+ release_ticket();\r
+ }\r
+};\r
+\r
+}
\ No newline at end of file
+++ /dev/null
-#pragma once\r
-\r
-#include "../memory/safe_ptr.h"\r
-\r
-#include <agents.h>\r
-#include <semaphore.h>\r
-\r
-#include <boost/noncopyable.hpp>\r
-\r
-namespace caspar {\r
- \r
-class token : boost::noncopyable\r
-{\r
- std::shared_ptr<Concurrency::semaphore> semaphore_;\r
-public:\r
-\r
- token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
- : semaphore_(semaphore)\r
- {\r
- semaphore_->acquire();\r
- }\r
-\r
- token(token&& other)\r
- : semaphore_(std::move(other.semaphore_))\r
- {\r
- }\r
-\r
- ~token()\r
- {\r
- if(semaphore_)\r
- semaphore_->release();\r
- }\r
-};\r
-\r
-template<typename T>\r
-class message\r
-{\r
- T payload_;\r
- token token_;\r
-public:\r
- message(const T& payload, token&& token)\r
- : payload_(payload)\r
- , token_(std::move(token))\r
- {\r
- }\r
- \r
- template<typename U>\r
- safe_ptr<message<U>> transfer(const U& payload)\r
- {\r
- return make_safe<message<U>>(payload, std::move(token_));\r
- }\r
-\r
- T& value() {return payload_;}\r
- const T& value() const {return payload_;}\r
-};\r
-\r
-}
\ No newline at end of file
\r
high_prec_timer timer_;\r
\r
- critical_section mutex_;\r
- call<safe_ptr<message<safe_ptr<read_frame>>>> output_;\r
+ critical_section mutex_;\r
+ call<output::source_element_t> output_;\r
\r
public:\r
implementation(output::source_t& source, const video_format_desc& format_desc) \r
}\r
}\r
\r
- void execute(const safe_ptr<message<safe_ptr<read_frame>>>& msg)\r
+ void execute(const output::source_element_t& element)\r
{ \r
- auto frame = msg->value();\r
+ auto frame = element->first;\r
\r
{\r
critical_section::scoped_lock lock(mutex_); \r
\r
#include "../consumer/frame_consumer.h"\r
\r
-#include <common/concurrency/message.h>\r
+#include <common/concurrency/governor.h>\r
#include <common/memory/safe_ptr.h>\r
\r
#include <boost/noncopyable.hpp>\r
class output : boost::noncopyable\r
{\r
public:\r
- typedef Concurrency::ISource<safe_ptr<message<safe_ptr<read_frame>>>> source_t;\r
+ typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>> source_element_t;\r
+ typedef Concurrency::ISource<source_element_t> source_t;\r
\r
explicit output(source_t& source, const video_format_desc& format_desc);\r
\r
std::unordered_map<int, blend_mode::type> blend_modes_;\r
\r
critical_section mutex_;\r
- Concurrency::transformer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>, \r
- safe_ptr<message<safe_ptr<core::read_frame>>>> mixer_;\r
+ Concurrency::transformer<mixer::source_element_t, mixer::target_element_t> mixer_;\r
public:\r
implementation(mixer::source_t& source, mixer::target_t& target, const video_format_desc& format_desc, ogl_device& ogl) \r
: format_desc_(format_desc)\r
source.link_target(&mixer_);\r
}\r
\r
- safe_ptr<message<safe_ptr<core::read_frame>>> mix(const safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>& msg)\r
+ mixer::target_element_t mix(const mixer::source_element_t& element)\r
{ \r
- auto frames = msg->value();\r
+ auto frames = element->first;\r
\r
auto frame = make_safe<read_frame>();\r
\r
Concurrency::wait(20);\r
}\r
\r
- return msg->transfer<safe_ptr<core::read_frame>>(std::move(frame)); \r
+ return mixer::target_element_t(std::make_pair(std::move(frame), element->second)); \r
}\r
\r
boost::unique_future<safe_ptr<core::write_frame>> async_create_frame(const void* tag, const core::pixel_format_desc& desc)\r
\r
#include "../producer/frame/frame_factory.h"\r
\r
-#include <common/concurrency/message.h>\r
+#include <common/concurrency/governor.h>\r
#include <common/memory/safe_ptr.h>\r
\r
#include <agents.h>\r
{\r
public: \r
\r
- typedef Concurrency::ISource<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> source_t;\r
- typedef Concurrency::ITarget<safe_ptr<message<safe_ptr<core::read_frame>>>> target_t;\r
+ typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> source_element_t;\r
+ typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>> target_element_t;\r
+\r
+ typedef Concurrency::ISource<source_element_t> source_t;\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
explicit mixer(source_t& source, target_t& target, const video_format_desc& format_desc, ogl_device& ogl);\r
\r
Concurrency::critical_section mutex_;\r
stage::target_t& target_;\r
std::map<int, layer> layers_; \r
- safe_ptr<semaphore> semaphore_;\r
+ governor& governor_;\r
public:\r
- implementation(stage::target_t& target, const safe_ptr<semaphore>& semaphore) \r
+ implementation(stage::target_t& target, governor& governor) \r
: target_(target)\r
- , semaphore_(semaphore)\r
+ , governor_(governor)\r
{\r
start();\r
}\r
~implementation()\r
{\r
send(is_running_, false);\r
- semaphore_->release();\r
+ governor_.cancel();\r
agent::wait(this);\r
}\r
\r
});\r
}\r
\r
- send(target_, make_safe<message<decltype(frames)>>(frames, token(semaphore_)));\r
+ send(target_, stage::target_element_t(std::make_pair(frames, governor_.acquire())));\r
}\r
catch(...)\r
{\r
\r
};\r
\r
-stage::stage(target_t& target, const safe_ptr<semaphore>& semaphore) : impl_(new implementation(target, semaphore)){}\r
+stage::stage(target_t& target, governor& governor) : impl_(new implementation(target, governor)){}\r
void stage::swap(stage& other){impl_->swap(other);}\r
void stage::load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta){impl_->load(index, producer, preview, auto_play_delta);}\r
void stage::pause(int index){impl_->pause(index);}\r
#include "frame_producer.h"\r
\r
#include <common/memory/safe_ptr.h>\r
-#include <common/concurrency/message.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
#include <agents.h>\r
-#include <semaphore.h>\r
\r
namespace caspar { namespace core {\r
\r
class stage : boost::noncopyable\r
{\r
public:\r
- typedef Concurrency::ITarget<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> target_t;\r
+ \r
+ typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> target_element_t;\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
- explicit stage(target_t& target, const safe_ptr<Concurrency::semaphore>& semaphore);\r
+ explicit stage(target_t& target, governor& governor);\r
\r
void swap(stage& other);\r
\r
#include "mixer/mixer.h"\r
#include "producer/stage.h"\r
\r
-#include <common/concurrency/executor.h>\r
+#include <common/concurrency/governor.h>\r
#include <common/diagnostics/graph.h>\r
\r
#include "mixer/gpu/ogl_device.h"\r
\r
#include <agents_extras.h>\r
-#include <semaphore.h>\r
\r
#include <boost/timer.hpp>\r
\r
\r
struct video_channel::implementation : boost::noncopyable\r
{\r
- unbounded_buffer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> stage_frames_;\r
- unbounded_buffer<safe_ptr<message<safe_ptr<read_frame>>>> mixer_frames_;\r
+ unbounded_buffer<safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>>> stage_frames_;\r
+ unbounded_buffer<safe_ptr<std::pair<safe_ptr<read_frame>, ticket_t>>> mixer_frames_;\r
\r
const video_format_desc format_desc_;\r
\r
- safe_ptr<semaphore> semaphore_;\r
+ governor governor_;\r
safe_ptr<caspar::core::output> output_;\r
safe_ptr<caspar::core::mixer> mixer_;\r
safe_ptr<caspar::core::stage> stage_;\r
implementation(int index, const video_format_desc& format_desc, ogl_device& ogl) \r
: graph_(diagnostics::create_graph(narrow(print()), false))\r
, format_desc_(format_desc)\r
- , semaphore_(make_safe<semaphore>(3))\r
+ , governor_(3)\r
, output_(new caspar::core::output(mixer_frames_, format_desc))\r
, mixer_(new caspar::core::mixer(stage_frames_, mixer_frames_, format_desc, ogl))\r
- , stage_(new caspar::core::stage(stage_frames_, semaphore_)) \r
+ , stage_(new caspar::core::stage(stage_frames_, governor_)) \r
{\r
graph_->add_guide("produce-time", 0.5f); \r
graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
#pragma warning (pop)\r
#endif\r
\r
-#include <connect.h>\r
-#include <semaphore.h>\r
-\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
{ \r
CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
\r
- Concurrency::connect(source, source_);\r
+ source.link_target(&source_);\r
\r
start();\r
}\r
#include <boost/noncopyable.hpp>\r
\r
#include <agents.h>\r
-#include <semaphore.h>\r
\r
#include <vector>\r
\r
\r
#include <agents.h>\r
#include <concrt_extras.h>\r
-#include <semaphore.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
-#include <agents.h>\r
-#include <semaphore.h>\r
-\r
struct AVFrame;\r
struct AVFormatContext;\r
struct AVPacket;\r
#pragma warning (pop)\r
#endif\r
\r
-#include <connect.h>\r
-#include <semaphore.h>\r
-\r
#include <tbb/scalable_allocator.h>\r
\r
using namespace Concurrency;\r
CASPAR_VERIFY(width_ > 0, ffmpeg_error());\r
CASPAR_VERIFY(height_ > 0, ffmpeg_error());\r
\r
- Concurrency::connect(source, source_);\r
+ source.link_target(&source_);\r
\r
start();\r
}\r