#include <common/memory/safe_ptr.h>\r
#include <common/exception/exceptions.h>\r
\r
-namespace caspar { namespace core {\r
+#include <concurrent_vector.h>\r
\r
+namespace caspar { namespace core {\r
+ \r
size_t consumer_buffer_depth()\r
{\r
return env::properties().get("configuration.consumers.buffer-depth", 5);\r
}\r
+\r
+struct destruction_context\r
+{\r
+ std::shared_ptr<frame_consumer> consumer;\r
+ Concurrency::event event;\r
+\r
+ destruction_context(std::shared_ptr<frame_consumer>&& consumer) \r
+ : consumer(consumer)\r
+ {\r
+ }\r
+};\r
+\r
+void __cdecl destroy_consumer(LPVOID lpParam)\r
+{\r
+ auto destruction = std::unique_ptr<destruction_context>(static_cast<destruction_context*>(lpParam));\r
+ \r
+ try\r
+ { \r
+ if(destruction->consumer.unique())\r
+ {\r
+ Concurrency::scoped_oversubcription_token oversubscribe;\r
+ destruction->consumer.reset();\r
+ }\r
+ else\r
+ CASPAR_LOG(warning) << destruction->consumer->print() << " Not destroyed asynchronously."; \r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
\r
-std::vector<const consumer_factory_t> g_factories;\r
+ destruction->event.set();\r
+}\r
+\r
+void __cdecl destroy_and_wait_consumer(LPVOID lpParam)\r
+{\r
+ try\r
+ {\r
+ auto destruction = static_cast<destruction_context*>(lpParam);\r
+ Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, lpParam);\r
+ if(destruction->event.wait(1000) == Concurrency::COOPERATIVE_WAIT_TIMEOUT)\r
+ CASPAR_LOG(warning) << " Potential destruction deadlock detected. Might leak resources.";\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
+}\r
+ \r
+class destroy_consumer_proxy : public frame_consumer\r
+{\r
+ std::shared_ptr<frame_consumer> consumer_;\r
+public:\r
+ destroy_consumer_proxy(const std::shared_ptr<frame_consumer>& consumer) \r
+ : consumer_(consumer)\r
+ {\r
+ }\r
+\r
+ ~destroy_consumer_proxy()\r
+ { \r
+ Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, new destruction_context(std::move(consumer_)));\r
+ } \r
+ \r
+ virtual bool send(const safe_ptr<read_frame>& frame) {return consumer_->send(frame);}\r
+ virtual void initialize(const video_format_desc& format_desc) {return consumer_->initialize(format_desc);}\r
+ virtual std::wstring print() const {return consumer_->print();}\r
+ virtual bool has_synchronization_clock() const {return consumer_->has_synchronization_clock();}\r
+ virtual const core::video_format_desc& get_video_format_desc() const {return consumer_->get_video_format_desc();}\r
+};\r
+\r
+Concurrency::concurrent_vector<std::shared_ptr<consumer_factory_t>> g_factories;\r
\r
void register_consumer_factory(const consumer_factory_t& factory)\r
{\r
- g_factories.push_back(factory);\r
+ g_factories.push_back(std::make_shared<consumer_factory_t>(factory));\r
}\r
\r
safe_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info(""));\r
\r
auto consumer = frame_consumer::empty();\r
- std::any_of(g_factories.begin(), g_factories.end(), [&](const consumer_factory_t& factory) -> bool\r
+ std::any_of(g_factories.begin(), g_factories.end(), [&](const std::shared_ptr<consumer_factory_t>& factory) -> bool\r
{\r
try\r
{\r
- consumer = factory(params);\r
+ consumer = (*factory)(params);\r
}\r
catch(...)\r
{\r
if(consumer == frame_consumer::empty())\r
BOOST_THROW_EXCEPTION(file_not_found() << msg_info("No match found for supplied commands. Check syntax."));\r
\r
- return consumer;\r
+ return make_safe<destroy_consumer_proxy>(consumer);\r
}\r
\r
}}
\ No newline at end of file
\r
namespace caspar { namespace core {\r
\r
-struct destruction_context\r
-{\r
- std::shared_ptr<frame_consumer> consumer;\r
- Concurrency::event event;\r
-\r
- destruction_context(std::shared_ptr<frame_consumer>&& consumer) \r
- : consumer(consumer)\r
- {\r
- }\r
-};\r
-\r
-void __cdecl destroy_consumer(LPVOID lpParam)\r
-{\r
- auto destruction = std::unique_ptr<destruction_context>(static_cast<destruction_context*>(lpParam));\r
- \r
- try\r
- { \r
- if(destruction->consumer.unique())\r
- {\r
- Concurrency::scoped_oversubcription_token oversubscribe;\r
- destruction->consumer.reset();\r
- }\r
- else\r
- CASPAR_LOG(warning) << destruction->consumer->print() << " Not destroyed asynchronously."; \r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
- \r
- destruction->event.set();\r
-}\r
-\r
-void __cdecl destroy_and_wait_consumer(LPVOID lpParam)\r
-{\r
- try\r
- {\r
- auto destruction = static_cast<destruction_context*>(lpParam);\r
- Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, lpParam);\r
- if(destruction->event.wait(1000) == Concurrency::COOPERATIVE_WAIT_TIMEOUT)\r
- CASPAR_LOG(warning) << " Potential destruction deadlock detected. Might leak resources.";\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
-}\r
-\r
struct output::implementation\r
{ \r
typedef std::pair<safe_ptr<read_frame>, safe_ptr<read_frame>> fill_and_key;\r
});\r
\r
BOOST_FOREACH(auto& removable, removables)\r
- {\r
- std::shared_ptr<frame_consumer> consumer = consumers_.find(removable)->second;\r
consumers_.erase(removable); \r
- Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, new destruction_context(std::move(consumer)));\r
- }\r
}\r
\r
private:\r
#include <common/exception/exceptions.h>\r
\r
#include <concrt_extras.h>\r
+#include <concurrent_vector.h>\r
\r
namespace caspar { namespace core {\r
\r
-std::vector<const producer_factory_t> g_factories;\r
- \r
struct destruction_context\r
{\r
std::shared_ptr<frame_producer> producer;\r
virtual int64_t nb_frames() const {return producer_->nb_frames();}\r
};\r
\r
-safe_ptr<core::frame_producer> create_destroy_producer_proxy(const safe_ptr<frame_producer>& producer)\r
-{\r
- return make_safe<destroy_producer_proxy>(producer);\r
-}\r
-\r
class last_frame_producer : public frame_producer\r
{\r
const std::wstring print_;\r
}\r
return frame;\r
}\r
+ \r
+Concurrency::concurrent_vector<std::shared_ptr<producer_factory_t>> g_factories;\r
\r
void register_producer_factory(const producer_factory_t& factory)\r
{\r
- g_factories.push_back(factory);\r
+ g_factories.push_back(std::make_shared<producer_factory_t>(factory));\r
}\r
\r
safe_ptr<core::frame_producer> do_create_producer(const safe_ptr<frame_factory>& my_frame_factory, const std::vector<std::wstring>& params)\r
BOOST_THROW_EXCEPTION(invalid_argument() << arg_name_info("params") << arg_value_info(""));\r
\r
auto producer = frame_producer::empty();\r
- std::any_of(g_factories.begin(), g_factories.end(), [&](const producer_factory_t& factory) -> bool\r
+ std::any_of(g_factories.begin(), g_factories.end(), [&](const std::shared_ptr<producer_factory_t>& factory) -> bool\r
{\r
try\r
{\r
- producer = factory(my_frame_factory, params);\r
+ producer = (*factory)(my_frame_factory, params);\r
}\r
catch(...)\r
{\r
if(producer == frame_producer::empty())\r
producer = create_color_producer(my_frame_factory, params);\r
\r
- return producer;\r
+ return make_safe<destroy_producer_proxy>(producer);\r
}\r
\r
-\r
safe_ptr<core::frame_producer> create_producer(const safe_ptr<frame_factory>& my_frame_factory, const std::vector<std::wstring>& params)\r
{ \r
auto producer = do_create_producer(my_frame_factory, params);\r
void register_producer_factory(const producer_factory_t& factory); // Not thread-safe.\r
safe_ptr<core::frame_producer> create_producer(const safe_ptr<frame_factory>&, const std::vector<std::wstring>& params);\r
\r
-safe_ptr<core::frame_producer> create_destroy_producer_proxy(const safe_ptr<frame_producer>& producer);\r
-\r
template<typename T>\r
typename std::decay<T>::type get_param(const std::wstring& name, const std::vector<std::wstring>& params, T fail_value)\r
{ \r
void load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta)\r
{\r
critical_section::scoped_lock lock(mutex_);\r
- layers_[index].load(create_destroy_producer_proxy(producer), preview, auto_play_delta);\r
+ layers_[index].load(producer, preview, auto_play_delta);\r
}\r
\r
void pause(int index)\r