<ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
<ClInclude Include="concurrency\com_context.h" />\r
<ClInclude Include="concurrency\executor.h" />\r
- <ClInclude Include="concurrency\governor.h" />\r
<ClInclude Include="concurrency\target.h" />\r
<ClInclude Include="diagnostics\graph.h" />\r
<ClInclude Include="exception\exceptions.h" />\r
<ClInclude Include="concurrency\target.h">\r
<Filter>source\concurrency</Filter>\r
</ClInclude>\r
- <ClInclude Include="concurrency\governor.h">\r
- <Filter>source\concurrency</Filter>\r
- </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
+++ /dev/null
-#pragma once\r
-\r
-#include <memory>\r
-\r
-#include <boost/thread/condition_variable.hpp>\r
-#include <boost/thread/mutex.hpp>\r
-\r
-#include <tbb/atomic.h>\r
-\r
-namespace caspar {\r
-\r
-typedef std::shared_ptr<void> ticket;\r
-\r
-namespace detail\r
-{ \r
- class governor_impl : public std::enable_shared_from_this<governor_impl>\r
- {\r
- boost::mutex mutex_;\r
- boost::condition_variable cond_;\r
- tbb::atomic<int> count_;\r
- public:\r
- governor_impl(size_t count) \r
- {\r
- count_ = count;\r
- }\r
-\r
- ticket acquire()\r
- {\r
- {\r
- boost::unique_lock<boost::mutex> lock(mutex_);\r
- while(count_ < 0)\r
- cond_.wait(lock);\r
- --count_;\r
- }\r
-\r
- auto self = shared_from_this();\r
- return ticket(nullptr, [self](void*)\r
- {\r
- ++self->count_;\r
- self->cond_.notify_one();\r
- });\r
- }\r
-\r
- void cancel()\r
- {\r
- count_ = std::numeric_limits<int>::max(); \r
- cond_.notify_all();\r
- }\r
- };\r
-}\r
-\r
-class governor\r
-{\r
- std::shared_ptr<detail::governor_impl> impl_;\r
-public:\r
-\r
- governor(size_t count) \r
- : impl_(new detail::governor_impl(count))\r
- {\r
- }\r
-\r
- ticket acquire()\r
- {\r
- return impl_->acquire();\r
- }\r
-\r
- void cancel()\r
- {\r
- impl_->cancel();\r
- }\r
-\r
-};\r
-\r
-}
\ No newline at end of file
return std::make_pair(min, max);\r
}\r
\r
- void send(const std::pair<safe_ptr<read_frame>, ticket>& packet)\r
+ void send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& packet)\r
{\r
executor_.begin_invoke([=]\r
{\r
output::output(const safe_ptr<diagnostics::graph>& graph, const video_format_desc& format_desc, int channel_index) : impl_(new implementation(graph, format_desc, channel_index)){}\r
void output::add(int index, safe_ptr<frame_consumer>&& consumer){impl_->add(index, std::move(consumer));}\r
void output::remove(int index){impl_->remove(index);}\r
-void output::send(const std::pair<safe_ptr<read_frame>, ticket>& frame) {impl_->send(frame); }\r
+void output::send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& frame) {impl_->send(frame); }\r
void output::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
}}
\ No newline at end of file
\r
#include <common/memory/safe_ptr.h>\r
#include <common/concurrency/target.h>\r
-#include <common/concurrency/governor.h>\r
#include <common/diagnostics/graph.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
namespace caspar { namespace core {\r
\r
-class output : public target<std::pair<safe_ptr<read_frame>, ticket>>, boost::noncopyable\r
+class output : public target<std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>>, boost::noncopyable\r
{\r
public:\r
explicit output(const safe_ptr<diagnostics::graph>& graph, const video_format_desc& format_desc, int channel_index);\r
\r
// target\r
\r
- virtual void send(const std::pair<safe_ptr<read_frame>, ticket>& frame) override;\r
+ virtual void send(const std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>& frame) override;\r
\r
// output\r
\r
graph_->set_color("mix-time", diagnostics::color(1.0f, 0.0f, 0.9f));\r
}\r
\r
- void send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, ticket>& packet)\r
+ void send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, std::shared_ptr<void>>& packet)\r
{ \r
executor_.begin_invoke([=]\r
{ \r
\r
mixer::mixer(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc, const safe_ptr<ogl_device>& ogl) \r
: impl_(new implementation(graph, target, format_desc, ogl)){}\r
-void mixer::send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, ticket>& frames){ impl_->send(frames);}\r
+void mixer::send(const std::pair<std::map<int, safe_ptr<core::basic_frame>>, std::shared_ptr<void>>& frames){ impl_->send(frames);}\r
core::video_format_desc mixer::get_video_format_desc() const { return impl_->get_video_format_desc(); }\r
safe_ptr<core::write_frame> mixer::create_frame(const void* tag, const core::pixel_format_desc& desc){ return impl_->create_frame(tag, desc); } \r
safe_ptr<core::write_frame> mixer::create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt)\r
\r
#include <common/memory/safe_ptr.h>\r
#include <common/concurrency/target.h>\r
-#include <common/concurrency/governor.h>\r
#include <common/diagnostics/graph.h>\r
\r
#include <map>\r
struct frame_transform;\r
struct pixel_format;\r
\r
-class mixer : public target<std::pair<std::map<int, safe_ptr<core::basic_frame>>, ticket>>, public core::frame_factory\r
+class mixer : public target<std::pair<std::map<int, safe_ptr<core::basic_frame>>, std::shared_ptr<void>>>, public core::frame_factory\r
{\r
public: \r
- typedef target<std::pair<safe_ptr<read_frame>, ticket>> target_t;\r
+ typedef target<std::pair<safe_ptr<read_frame>, std::shared_ptr<void>>> target_t;\r
\r
explicit mixer(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc, const safe_ptr<ogl_device>& ogl);\r
\r
// target\r
\r
- virtual void send(const std::pair<std::map<int, safe_ptr<basic_frame>>, ticket>& frames) override; \r
+ virtual void send(const std::pair<std::map<int, safe_ptr<basic_frame>>, std::shared_ptr<void>>& frames) override; \r
\r
// mixer\r
\r
#include "frame/frame_factory.h"\r
\r
#include <common/concurrency/executor.h>\r
-#include <common/concurrency/governor.h>\r
-#include <common/env.h>\r
\r
#include <boost/foreach.hpp>\r
#include <boost/timer.hpp>\r
\r
namespace caspar { namespace core {\r
\r
-struct stage::implementation : boost::noncopyable\r
+struct stage::implementation : public std::enable_shared_from_this<implementation>\r
+ , boost::noncopyable\r
{ \r
safe_ptr<diagnostics::graph> graph_;\r
safe_ptr<stage::target_t> target_;\r
\r
std::map<int, layer> layers_; \r
\r
- governor governor_;\r
executor executor_;\r
public:\r
implementation(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<stage::target_t>& target, const video_format_desc& format_desc) \r
, format_desc_(format_desc)\r
, target_(target)\r
, executor_(L"stage")\r
- , governor_(std::max(1, env::properties().get("configuration.pipeline-tokens", 2)))\r
{\r
graph_->add_guide("tick-time", 0.5f); \r
graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f)); \r
graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
-\r
- executor_.begin_invoke([this]{tick();});\r
}\r
\r
- ~implementation()\r
+ void spawn_token()\r
{\r
- governor_.cancel();\r
+ std::weak_ptr<implementation> self = shared_from_this();\r
+ executor_.begin_invoke([=]{tick(self);});\r
}\r
- \r
- void tick()\r
+ \r
+ void tick(const std::weak_ptr<implementation>& self)\r
{ \r
try\r
{\r
- auto ticket = governor_.acquire();\r
-\r
produce_timer_.restart();\r
\r
std::map<int, safe_ptr<basic_frame>> frames;\r
\r
graph_->update_value("produce-time", produce_timer_.elapsed()*format_desc_.fps*0.5);\r
\r
+ std::shared_ptr<void> ticket(nullptr, [self](void*)\r
+ {\r
+ auto self2 = self.lock();\r
+ if(self2) \r
+ self2->executor_.begin_invoke([=]{tick(self);}); \r
+ });\r
+\r
target_->send(std::make_pair(frames, ticket));\r
\r
graph_->update_value("tick-time", tick_timer_.elapsed()*format_desc_.fps*0.5);\r
layers_.clear();\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
} \r
- executor_.begin_invoke([this]{tick();});\r
}\r
\r
void load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta)\r
};\r
\r
stage::stage(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc) : impl_(new implementation(graph, target, format_desc)){}\r
+void stage::spawn_token(){impl_->spawn_token();}\r
void stage::swap(stage& other){impl_->swap(other);}\r
void stage::load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta){impl_->load(index, producer, preview, auto_play_delta);}\r
void stage::pause(int index){impl_->pause(index);}\r
\r
#include <common/memory/safe_ptr.h>\r
#include <common/concurrency/target.h>\r
-#include <common/concurrency/governor.h>\r
#include <common/diagnostics/graph.h>\r
\r
#include <boost/noncopyable.hpp>\r
class stage : boost::noncopyable\r
{\r
public:\r
- typedef target<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket>> target_t;\r
+ typedef target<std::pair<std::map<int, safe_ptr<basic_frame>>, std::shared_ptr<void>>> target_t;\r
\r
explicit stage(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<target_t>& target, const video_format_desc& format_desc);\r
\r
// stage\r
\r
+ void spawn_token();\r
+\r
void swap(stage& other);\r
\r
void load(int index, const safe_ptr<frame_producer>& producer, bool preview = false, int auto_play_delta = -1);\r
#include "producer/stage.h"\r
\r
#include <common/diagnostics/graph.h>\r
+#include <common/env.h>\r
\r
#include <string>\r
\r
graph_->set_text(print());\r
diagnostics::register_graph(graph_);\r
\r
+ for(int n = 0; n < std::max(1, env::properties().get("configuration.pipeline-tokens", 2)); ++n)\r
+ stage_->spawn_token();\r
+\r
CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
}\r
\r