+++ /dev/null
-#include "../stdafx.h"\r
-\r
-#include "cadence_guard.h"\r
-\r
-#include "frame_consumer.h"\r
-\r
-#include <common/env.h>\r
-#include <common/spl/memory.h>\r
-#include <common/except.h>\r
-\r
-#include <core/video_format.h>\r
-#include <core/frame/data_frame.h>\r
-\r
-namespace caspar { namespace core {\r
-\r
-// This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.\r
-class cadence_guard : public frame_consumer\r
-{\r
- spl::shared_ptr<frame_consumer> consumer_;\r
- std::vector<int> audio_cadence_;\r
- boost::circular_buffer<int> sync_buffer_;\r
-public:\r
- cadence_guard(const spl::shared_ptr<frame_consumer>& consumer)\r
- : consumer_(consumer)\r
- {\r
- }\r
- \r
- virtual void initialize(const video_format_desc& format_desc, int channel_index) override\r
- {\r
- audio_cadence_ = format_desc.audio_cadence;\r
- sync_buffer_ = boost::circular_buffer<int>(format_desc.audio_cadence.size());\r
- consumer_->initialize(format_desc, channel_index);\r
- }\r
-\r
- virtual bool send(const spl::shared_ptr<const data_frame>& frame) override\r
- { \r
- if(audio_cadence_.size() == 1)\r
- return consumer_->send(frame);\r
-\r
- bool result = true;\r
- \r
- if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame->audio_data().size())) \r
- { \r
- // Audio sent so far is in sync, now we can send the next chunk.\r
- result = consumer_->send(frame);\r
- boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
- }\r
- else\r
- CASPAR_LOG(trace) << print() << L" Syncing audio.";\r
-\r
- sync_buffer_.push_back(static_cast<int>(frame->audio_data().size()));\r
- \r
- return result;\r
- }\r
-\r
- virtual std::wstring print() const override\r
- {\r
- return consumer_->print();\r
- }\r
-\r
- virtual boost::property_tree::wptree info() const override\r
- {\r
- return consumer_->info();\r
- }\r
-\r
- virtual bool has_synchronization_clock() const override\r
- {\r
- return consumer_->has_synchronization_clock();\r
- }\r
-\r
- virtual int buffer_depth() const override\r
- {\r
- return consumer_->buffer_depth();\r
- }\r
-\r
- virtual int index() const override\r
- {\r
- return consumer_->index();\r
- }\r
-};\r
-\r
-spl::shared_ptr<frame_consumer> create_consumer_cadence_guard(const spl::shared_ptr<frame_consumer>& consumer)\r
-{\r
- return spl::make_shared<cadence_guard>(std::move(consumer));\r
-}\r
-\r
-}}
\ No newline at end of file
+++ /dev/null
-#pragma once\r
-\r
-#include <common/spl/memory.h>\r
-\r
-namespace caspar { namespace core {\r
- \r
-spl::shared_ptr<struct frame_consumer> create_consumer_cadence_guard(const spl::shared_ptr<struct frame_consumer>& consumer);\r
-\r
-}}
\ No newline at end of file
\r
#include "frame_consumer.h"\r
\r
+#include <common/except.h>\r
+\r
+#include <core/video_format.h>\r
+#include <core/frame/data_frame.h>\r
+\r
+#include <common/concurrency/async.h>\r
+\r
namespace caspar { namespace core {\r
\r
std::vector<const consumer_factory_t> g_factories;\r
g_factories.push_back(factory);\r
}\r
\r
+class destroy_consumer_proxy : public frame_consumer\r
+{ \r
+ std::unique_ptr<std::shared_ptr<frame_consumer>> consumer_;\r
+public:\r
+ destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
+ : consumer_(new std::shared_ptr<frame_consumer>(std::move(consumer)))\r
+ {\r
+ }\r
+\r
+ ~destroy_consumer_proxy()\r
+ { \r
+ static tbb::atomic<int> counter = tbb::atomic<int>();\r
+ \r
+ ++counter;\r
+ CASPAR_VERIFY(counter < 32);\r
+ \r
+ auto consumer = consumer_.release();\r
+ async([=]\r
+ {\r
+ std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);\r
+\r
+ auto str = (*consumer)->print();\r
+ try\r
+ {\r
+ if(!consumer->unique())\r
+ CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();\r
+ else\r
+ CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";\r
+ }\r
+ catch(...){}\r
+\r
+ pointer_guard.reset();\r
+\r
+ --counter;\r
+ }); \r
+ }\r
+ \r
+ virtual bool send(const spl::shared_ptr<const struct data_frame>& frame) { return (*consumer_)->send(frame);}\r
+ virtual void initialize(const struct video_format_desc& format_desc, int channel_index) { return (*consumer_)->initialize(format_desc, channel_index);}\r
+ virtual std::wstring print() const { return (*consumer_)->print();}\r
+ virtual boost::property_tree::wptree info() const { return (*consumer_)->info();}\r
+ virtual bool has_synchronization_clock() const { return (*consumer_)->has_synchronization_clock();}\r
+ virtual int buffer_depth() const { return (*consumer_)->buffer_depth();}\r
+ virtual int index() const { return (*consumer_)->index();}\r
+};\r
+\r
+class print_consumer_proxy : public frame_consumer\r
+{ \r
+ std::shared_ptr<frame_consumer> consumer_;\r
+public:\r
+ print_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer) \r
+ : consumer_(std::move(consumer))\r
+ {\r
+ CASPAR_LOG(info) << consumer_->print() << L" Initialized.";\r
+ }\r
+\r
+ ~print_consumer_proxy()\r
+ { \r
+ auto str = consumer_->print();\r
+ CASPAR_LOG(trace) << str << L" Uninitializing.";\r
+ consumer_.reset();\r
+ CASPAR_LOG(info) << str << L" Uninitialized.";\r
+ }\r
+ \r
+ virtual bool send(const spl::shared_ptr<const struct data_frame>& frame) { return consumer_->send(frame);}\r
+ virtual void initialize(const struct video_format_desc& format_desc, int channel_index) { return consumer_->initialize(format_desc, channel_index);}\r
+ virtual std::wstring print() const { return consumer_->print();}\r
+ virtual boost::property_tree::wptree info() const { return consumer_->info();}\r
+ virtual bool has_synchronization_clock() const { return consumer_->has_synchronization_clock();}\r
+ virtual int buffer_depth() const { return consumer_->buffer_depth();}\r
+ virtual int index() const { return consumer_->index();}\r
+};\r
+\r
+// This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.\r
+class cadence_guard : public frame_consumer\r
+{\r
+ spl::shared_ptr<frame_consumer> consumer_;\r
+ std::vector<int> audio_cadence_;\r
+ boost::circular_buffer<int> sync_buffer_;\r
+public:\r
+ cadence_guard(const spl::shared_ptr<frame_consumer>& consumer)\r
+ : consumer_(consumer)\r
+ {\r
+ }\r
+ \r
+ virtual void initialize(const video_format_desc& format_desc, int channel_index) override\r
+ {\r
+ audio_cadence_ = format_desc.audio_cadence;\r
+ sync_buffer_ = boost::circular_buffer<int>(format_desc.audio_cadence.size());\r
+ consumer_->initialize(format_desc, channel_index);\r
+ }\r
+\r
+ virtual bool send(const spl::shared_ptr<const data_frame>& frame) override\r
+ { \r
+ if(audio_cadence_.size() == 1)\r
+ return consumer_->send(frame);\r
+\r
+ bool result = true;\r
+ \r
+ if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame->audio_data().size())) \r
+ { \r
+ // Audio sent so far is in sync, now we can send the next chunk.\r
+ result = consumer_->send(frame);\r
+ boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
+ }\r
+ else\r
+ CASPAR_LOG(trace) << print() << L" Syncing audio.";\r
+\r
+ sync_buffer_.push_back(static_cast<int>(frame->audio_data().size()));\r
+ \r
+ return result;\r
+ }\r
+\r
+ virtual std::wstring print() const override {return consumer_->print(); }\r
+ virtual boost::property_tree::wptree info() const override {return consumer_->info(); }\r
+ virtual bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
+ virtual int buffer_depth() const override {return consumer_->buffer_depth();}\r
+ virtual int index() const override {return consumer_->index();}\r
+};\r
+\r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
{\r
if(params.empty())\r
if(consumer == frame_consumer::empty())\r
BOOST_THROW_EXCEPTION(file_not_found() << msg_info("No match found for supplied commands. Check syntax."));\r
\r
- return consumer;\r
+ return spl::make_shared<destroy_consumer_proxy>(\r
+ spl::make_shared<print_consumer_proxy>(\r
+ spl::make_shared<cadence_guard>(\r
+ std::move(consumer))));\r
}\r
\r
const spl::shared_ptr<frame_consumer>& frame_consumer::empty()\r
\r
#include "frame_consumer.h"\r
\r
-#include "cadence_guard.h"\r
-\r
#include "../video_format.h"\r
#include "../frame/data_frame.h"\r
\r
+#include <common/concurrency/async.h>\r
#include <common/concurrency/executor.h>\r
#include <common/diagnostics/graph.h>\r
#include <common/prec_timer.h>\r
\r
struct output::impl\r
{ \r
- const int channel_index_;\r
- video_format_desc format_desc_;\r
+ const int channel_index_;\r
+ video_format_desc format_desc_;\r
std::map<int, spl::shared_ptr<frame_consumer>> consumers_; \r
- prec_timer sync_timer_;\r
+ prec_timer sync_timer_;\r
boost::circular_buffer<spl::shared_ptr<const data_frame>> frames_;\r
-\r
- executor executor_; \r
+ executor executor_; \r
public:\r
impl(const video_format_desc& format_desc, int channel_index) \r
: channel_index_(channel_index)\r
{ \r
remove(index);\r
\r
- consumer = create_consumer_cadence_guard(consumer);\r
consumer->initialize(format_desc_, channel_index_);\r
\r
- executor_.invoke([&]\r
+ executor_.begin_invoke([=]\r
{\r
consumers_.insert(std::make_pair(index, consumer));\r
CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added.";\r
\r
void remove(int index)\r
{ \r
- auto consumer = executor_.invoke([&]() -> std::shared_ptr<frame_consumer>\r
+ executor_.begin_invoke([=]\r
{\r
- auto consumer = frame_consumer::empty();\r
auto it = consumers_.find(index);\r
if(it != consumers_.end())\r
- {\r
- consumer = it->second;\r
- consumers_.erase(it);\r
- }\r
- return consumer;\r
+ consumers_.erase(it); \r
}, task_priority::high_priority);\r
-\r
- // Destroy consumer on calling thread:\r
- if(consumer)\r
- {\r
- auto str = consumer->print();\r
- consumer.reset();\r
- CASPAR_LOG(info) << print() << L" " << str << L" Removed.";\r
- }\r
}\r
\r
void remove(const spl::shared_ptr<frame_consumer>& consumer)\r
</Lib>\r
</ItemDefinitionGroup>\r
<ItemGroup>\r
- <ClInclude Include="consumer\cadence_guard.h" />\r
<ClInclude Include="frame\draw_frame.h" />\r
<ClInclude Include="frame\data_frame.h" />\r
<ClInclude Include="frame\frame_factory.h" />\r
<ClInclude Include="StdAfx.h" />\r
</ItemGroup>\r
<ItemGroup>\r
- <ClCompile Include="consumer\cadence_guard.cpp">\r
- <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
- <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
- </ClCompile>\r
<ClCompile Include="frame\data_frame.cpp">\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<ClInclude Include="frame\draw_frame.h">\r
<Filter>source\frame</Filter>\r
</ClInclude>\r
- <ClInclude Include="consumer\cadence_guard.h">\r
- <Filter>source\consumer</Filter>\r
- </ClInclude>\r
<ClInclude Include="frame\write_frame.h">\r
<Filter>source\frame</Filter>\r
</ClInclude>\r
<ClCompile Include="frame\data_frame.cpp">\r
<Filter>source\frame</Filter>\r
</ClCompile>\r
- <ClCompile Include="consumer\cadence_guard.cpp">\r
- <Filter>source\consumer</Filter>\r
- </ClCompile>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
~destroy_producer_proxy()\r
{ \r
static tbb::atomic<int> counter = tbb::atomic<int>();\r
-\r
- try\r
- { \r
- auto producer = producer_.release();\r
- ++counter;\r
- CASPAR_VERIFY(counter < 32);\r
-\r
- async([=]\r
- {\r
- std::unique_ptr<std::shared_ptr<frame_producer>> producer2(producer);\r
-\r
- auto str = (*producer2)->print();\r
- try\r
- {\r
- if(!producer->unique())\r
- CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << producer->use_count();\r
- else\r
- CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";\r
- }\r
- catch(...){}\r
- \r
- producer2.reset();\r
-\r
- --counter;\r
- }); \r
- }\r
- catch(...)\r
+ \r
+ ++counter;\r
+ CASPAR_VERIFY(counter < 32);\r
+ \r
+ auto producer = producer_.release();\r
+ async([=]\r
{\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
+ std::unique_ptr<std::shared_ptr<frame_producer>> pointer_guard(producer);\r
+\r
+ auto str = (*producer)->print();\r
try\r
{\r
- producer_.reset();\r
+ if(!producer->unique())\r
+ CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << producer->use_count();\r
+ else\r
+ CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";\r
}\r
catch(...){}\r
- }\r
+\r
+ pointer_guard.reset();\r
+\r
+ --counter;\r
+ }); \r
}\r
\r
virtual spl::shared_ptr<draw_frame> receive(int hints) override {return (*producer_)->receive(hints);}\r
if(producer == frame_producer::empty())\r
return producer;\r
\r
- return spl::make_shared<destroy_producer_proxy>(spl::make_shared<print_producer_proxy>(std::move(producer)));\r
+ return spl::make_shared<destroy_producer_proxy>(\r
+ spl::make_shared<print_producer_proxy>(\r
+ std::move(producer)));\r
}\r
\r
spl::shared_ptr<core::frame_producer> create_producer(const spl::shared_ptr<frame_factory>& my_frame_factory, const std::vector<std::wstring>& params)\r
{\r
}\r
\r
- ~bluefish_consumer_proxy()\r
- {\r
- if(consumer_)\r
- {\r
- auto str = print();\r
- consumer_.reset();\r
- CASPAR_LOG(info) << str << L" Successfully Uninitialized."; \r
- }\r
- }\r
-\r
// frame_consumer\r
\r
virtual void initialize(const core::video_format_desc& format_desc, int channel_index) override\r
{\r
consumer_.reset(new bluefish_consumer(format_desc, device_index_, embedded_audio_, key_only_, channel_index));\r
- audio_cadence_ = format_desc.audio_cadence;\r
- CASPAR_LOG(info) << print() << L" Successfully Initialized."; \r
+ audio_cadence_ = format_desc.audio_cadence; \r
}\r
\r
virtual bool send(const spl::shared_ptr<const core::data_frame>& frame) override\r
{\r
executor_.invoke([=]\r
{\r
- if(consumer_)\r
- {\r
- auto str = print();\r
- consumer_.reset();\r
- CASPAR_LOG(info) << str << L" Successfully Uninitialized."; \r
- }\r
::CoUninitialize();\r
});\r
}\r
executor_.invoke([=]\r
{\r
consumer_.reset(new decklink_consumer(config_, format_desc, channel_index)); \r
- audio_cadence_ = format_desc.audio_cadence; \r
-\r
- CASPAR_LOG(info) << print() << L" Successfully Initialized."; \r
+ audio_cadence_ = format_desc.audio_cadence; \r
});\r
}\r
\r
THROW_ON_ERROR2(avio_open(&oc_->pb, u8(filename_).c_str(), URL_WRONLY), "[ffmpeg_consumer]");\r
\r
THROW_ON_ERROR2(av_write_header(oc_.get()), "[ffmpeg_consumer]");\r
-\r
- CASPAR_LOG(info) << print() << L" Successfully Initialized."; \r
}\r
\r
~ffmpeg_consumer()\r
\r
if (!(oc_->oformat->flags & AVFMT_NOFILE)) \r
LOG_ON_ERROR2(avio_close(oc_->pb), "[ffmpeg_consumer]"); // Close the output ffmpeg.\r
-\r
- CASPAR_LOG(info) << print() << L" Successfully Uninitialized."; \r
}\r
\r
std::wstring print() const\r
Stop();\r
input_.try_push(std::make_shared<audio_buffer_16>());\r
input_.try_push(std::make_shared<audio_buffer_16>());\r
-\r
- CASPAR_LOG(info) << print() << L" Successfully Uninitialized."; \r
}\r
\r
// frame consumer\r
sf::SoundStream::Initialize(2, 48000);\r
Play(); \r
}\r
- CASPAR_LOG(info) << print() << " Sucessfully Initialized.";\r
}\r
\r
virtual bool send(const spl::shared_ptr<const core::data_frame>& frame) override\r
screen_consumer_proxy(const configuration& config)\r
: config_(config){}\r
\r
- ~screen_consumer_proxy()\r
- {\r
- if(consumer_)\r
- {\r
- auto str = print();\r
- consumer_.reset();\r
- CASPAR_LOG(info) << str << L" Successfully Uninitialized."; \r
- }\r
- }\r
-\r
// frame_consumer\r
\r
virtual void initialize(const core::video_format_desc& format_desc, int channel_index) override\r
{\r
consumer_.reset();\r
consumer_.reset(new screen_consumer(config_, format_desc, channel_index));\r
- CASPAR_LOG(info) << print() << L" Successfully Initialized."; \r
}\r
\r
virtual bool send(const spl::shared_ptr<const core::data_frame>& frame) override\r