From 2b27061fbf2ec4251c7689ccf6096893412e1d43 Mon Sep 17 00:00:00 2001 From: ronag Date: Sun, 23 Oct 2011 13:07:09 +0000 Subject: [PATCH] git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0 concrt-experimental@1403 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d --- core/consumer/frame_consumer.cpp | 83 +++++++++++++++++++++++++++++--- core/consumer/output.cpp | 52 -------------------- core/producer/frame_producer.cpp | 19 +++----- core/producer/frame_producer.h | 2 - core/producer/stage.cpp | 2 +- 5 files changed, 85 insertions(+), 73 deletions(-) diff --git a/core/consumer/frame_consumer.cpp b/core/consumer/frame_consumer.cpp index 02ed56b85..b6c5d38a4 100644 --- a/core/consumer/frame_consumer.cpp +++ b/core/consumer/frame_consumer.cpp @@ -25,18 +25,89 @@ #include #include -namespace caspar { namespace core { +#include +namespace caspar { namespace core { + size_t consumer_buffer_depth() { return env::properties().get("configuration.consumers.buffer-depth", 5); } + +struct destruction_context +{ + std::shared_ptr consumer; + Concurrency::event event; + + destruction_context(std::shared_ptr&& consumer) + : consumer(consumer) + { + } +}; + +void __cdecl destroy_consumer(LPVOID lpParam) +{ + auto destruction = std::unique_ptr(static_cast(lpParam)); + + try + { + if(destruction->consumer.unique()) + { + Concurrency::scoped_oversubcription_token oversubscribe; + destruction->consumer.reset(); + } + else + CASPAR_LOG(warning) << destruction->consumer->print() << " Not destroyed asynchronously."; + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } -std::vector g_factories; + destruction->event.set(); +} + +void __cdecl destroy_and_wait_consumer(LPVOID lpParam) +{ + try + { + auto destruction = static_cast(lpParam); + Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, lpParam); + if(destruction->event.wait(1000) == Concurrency::COOPERATIVE_WAIT_TIMEOUT) + CASPAR_LOG(warning) << " Potential destruction deadlock detected. Might leak resources."; + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } +} + +class destroy_consumer_proxy : public frame_consumer +{ + std::shared_ptr consumer_; +public: + destroy_consumer_proxy(const std::shared_ptr& consumer) + : consumer_(consumer) + { + } + + ~destroy_consumer_proxy() + { + Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, new destruction_context(std::move(consumer_))); + } + + virtual bool send(const safe_ptr& frame) {return consumer_->send(frame);} + virtual void initialize(const video_format_desc& format_desc) {return consumer_->initialize(format_desc);} + virtual std::wstring print() const {return consumer_->print();} + virtual bool has_synchronization_clock() const {return consumer_->has_synchronization_clock();} + virtual const core::video_format_desc& get_video_format_desc() const {return consumer_->get_video_format_desc();} +}; + +Concurrency::concurrent_vector> g_factories; void register_consumer_factory(const consumer_factory_t& factory) { - g_factories.push_back(factory); + g_factories.push_back(std::make_shared(factory)); } safe_ptr create_consumer(const std::vector& params) @@ -45,11 +116,11 @@ safe_ptr create_consumer(const std::vector& BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info("")); auto consumer = frame_consumer::empty(); - std::any_of(g_factories.begin(), g_factories.end(), [&](const consumer_factory_t& factory) -> bool + std::any_of(g_factories.begin(), g_factories.end(), [&](const std::shared_ptr& factory) -> bool { try { - consumer = factory(params); + consumer = (*factory)(params); } catch(...) { @@ -61,7 +132,7 @@ safe_ptr create_consumer(const std::vector& if(consumer == frame_consumer::empty()) BOOST_THROW_EXCEPTION(file_not_found() << msg_info("No match found for supplied commands. Check syntax.")); - return consumer; + return make_safe(consumer); } }} \ No newline at end of file diff --git a/core/consumer/output.cpp b/core/consumer/output.cpp index e8adaca5e..c863ee620 100644 --- a/core/consumer/output.cpp +++ b/core/consumer/output.cpp @@ -41,54 +41,6 @@ using namespace Concurrency; namespace caspar { namespace core { -struct destruction_context -{ - std::shared_ptr consumer; - Concurrency::event event; - - destruction_context(std::shared_ptr&& consumer) - : consumer(consumer) - { - } -}; - -void __cdecl destroy_consumer(LPVOID lpParam) -{ - auto destruction = std::unique_ptr(static_cast(lpParam)); - - try - { - if(destruction->consumer.unique()) - { - Concurrency::scoped_oversubcription_token oversubscribe; - destruction->consumer.reset(); - } - else - CASPAR_LOG(warning) << destruction->consumer->print() << " Not destroyed asynchronously."; - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - } - - destruction->event.set(); -} - -void __cdecl destroy_and_wait_consumer(LPVOID lpParam) -{ - try - { - auto destruction = static_cast(lpParam); - Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, lpParam); - if(destruction->event.wait(1000) == Concurrency::COOPERATIVE_WAIT_TIMEOUT) - CASPAR_LOG(warning) << " Potential destruction deadlock detected. Might leak resources."; - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - } -} - struct output::implementation { typedef std::pair, safe_ptr> fill_and_key; @@ -178,11 +130,7 @@ public: }); BOOST_FOREACH(auto& removable, removables) - { - std::shared_ptr consumer = consumers_.find(removable)->second; consumers_.erase(removable); - Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, new destruction_context(std::move(consumer))); - } } private: diff --git a/core/producer/frame_producer.cpp b/core/producer/frame_producer.cpp index 50e61b328..1d4e6d133 100644 --- a/core/producer/frame_producer.cpp +++ b/core/producer/frame_producer.cpp @@ -31,11 +31,10 @@ #include #include +#include namespace caspar { namespace core { -std::vector g_factories; - struct destruction_context { std::shared_ptr producer; @@ -107,11 +106,6 @@ public: virtual int64_t nb_frames() const {return producer_->nb_frames();} }; -safe_ptr create_destroy_producer_proxy(const safe_ptr& producer) -{ - return make_safe(producer); -} - class last_frame_producer : public frame_producer { const std::wstring print_; @@ -165,10 +159,12 @@ safe_ptr receive_and_follow(safe_ptr& producer, int } return frame; } + +Concurrency::concurrent_vector> g_factories; void register_producer_factory(const producer_factory_t& factory) { - g_factories.push_back(factory); + g_factories.push_back(std::make_shared(factory)); } safe_ptr do_create_producer(const safe_ptr& my_frame_factory, const std::vector& params) @@ -177,11 +173,11 @@ safe_ptr do_create_producer(const safe_ptr& BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info("")); auto producer = frame_producer::empty(); - std::any_of(g_factories.begin(), g_factories.end(), [&](const producer_factory_t& factory) -> bool + std::any_of(g_factories.begin(), g_factories.end(), [&](const std::shared_ptr& factory) -> bool { try { - producer = factory(my_frame_factory, params); + producer = (*factory)(my_frame_factory, params); } catch(...) { @@ -193,10 +189,9 @@ safe_ptr do_create_producer(const safe_ptr& if(producer == frame_producer::empty()) producer = create_color_producer(my_frame_factory, params); - return producer; + return make_safe(producer); } - safe_ptr create_producer(const safe_ptr& my_frame_factory, const std::vector& params) { auto producer = do_create_producer(my_frame_factory, params); diff --git a/core/producer/frame_producer.h b/core/producer/frame_producer.h index 067a4ee5f..c19fec211 100644 --- a/core/producer/frame_producer.h +++ b/core/producer/frame_producer.h @@ -71,8 +71,6 @@ typedef std::function(const safe_ptr create_producer(const safe_ptr&, const std::vector& params); -safe_ptr create_destroy_producer_proxy(const safe_ptr& producer); - template typename std::decay::type get_param(const std::wstring& name, const std::vector& params, T fail_value) { diff --git a/core/producer/stage.cpp b/core/producer/stage.cpp index ff9d4d26e..a86ec13af 100644 --- a/core/producer/stage.cpp +++ b/core/producer/stage.cpp @@ -100,7 +100,7 @@ public: void load(int index, const safe_ptr& producer, bool preview, int auto_play_delta) { critical_section::scoped_lock lock(mutex_); - layers_[index].load(create_destroy_producer_proxy(producer), preview, auto_play_delta); + layers_[index].load(producer, preview, auto_play_delta); } void pause(int index) -- 2.39.2