* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
*\r
*/\r
+// TODO: Try to recover consumer from bad_alloc...\r
#include "../StdAfx.h"\r
\r
#ifdef _MSC_VER\r
\r
#include "output.h"\r
\r
-#include "../video_channel_context.h"\r
-\r
#include "../video_format.h"\r
#include "../mixer/gpu/ogl_device.h"\r
#include "../mixer/read_frame.h"\r
#include <common/utility/timer.h>\r
#include <common/memory/memshfl.h>\r
\r
-#include <tbb/mutex.h>\r
-\r
-namespace caspar { namespace core {\r
-\r
-class key_read_frame_muxer : public core::read_frame\r
-{\r
- ogl_device& ogl_;\r
- safe_ptr<read_frame> fill_;\r
- std::shared_ptr<host_buffer> key_;\r
- tbb::mutex mutex_;\r
-public:\r
- key_read_frame_muxer(ogl_device& ogl, const safe_ptr<read_frame>& fill)\r
- : ogl_(ogl)\r
- , fill_(fill)\r
- {\r
- }\r
-\r
- virtual const boost::iterator_range<const uint8_t*> image_data()\r
- {\r
- tbb::mutex::scoped_lock lock(mutex_);\r
- if(!key_)\r
- {\r
- key_ = ogl_.create_host_buffer(fill_->image_data().size(), host_buffer::write_only); \r
- fast_memsfhl(key_->data(), fill_->image_data().begin(), fill_->image_data().size(), 0x0F0F0F0F, 0x0B0B0B0B, 0x07070707, 0x03030303);\r
- }\r
+#include <concrt_extras.h>\r
\r
- auto ptr = static_cast<const uint8_t*>(key_->data());\r
- return boost::iterator_range<const uint8_t*>(ptr, ptr + key_->size());\r
- }\r
+using namespace Concurrency;\r
\r
- virtual const boost::iterator_range<const int16_t*> audio_data()\r
- {\r
- return fill_->audio_data();\r
- } \r
-};\r
+namespace caspar { namespace core {\r
\r
struct output::implementation\r
{ \r
typedef std::pair<safe_ptr<read_frame>, safe_ptr<read_frame>> fill_and_key;\r
\r
- video_channel_context& channel_;\r
+ const video_format_desc format_desc_;\r
\r
std::map<int, safe_ptr<frame_consumer>> consumers_;\r
typedef std::map<int, safe_ptr<frame_consumer>>::value_type layer_t;\r
\r
high_prec_timer timer_;\r
+\r
+ critical_section mutex_;\r
+ call<safe_ptr<message<safe_ptr<read_frame>>>> output_;\r
\r
public:\r
- implementation(video_channel_context& video_channel) \r
- : channel_(video_channel){} \r
+ implementation(output::source_t& source, const video_format_desc& format_desc) \r
+ : format_desc_(format_desc)\r
+ , output_(std::bind(&implementation::execute, this, std::placeholders::_1))\r
+ {\r
+ source.link_target(&output_);\r
+ } \r
\r
void add(int index, safe_ptr<frame_consumer>&& consumer)\r
{ \r
- consumer->initialize(channel_.get_format_desc());\r
- channel_.execution().invoke([&]\r
+ remove(index);\r
+ wait(100); // Wait a bit to allow asynchronous destruction to finish.\r
+\r
+ consumer->initialize(format_desc_); \r
+ \r
{\r
- consumers_.erase(index);\r
+ critical_section::scoped_lock lock(mutex_);\r
+\r
consumers_.insert(std::make_pair(index, consumer));\r
\r
- CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added.";\r
- });\r
+ CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added."; \r
+ }\r
}\r
\r
void remove(int index)\r
{\r
- channel_.execution().invoke([&]\r
{\r
+ critical_section::scoped_lock lock(mutex_);\r
+\r
auto it = consumers_.find(index);\r
if(it != consumers_.end())\r
{\r
CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
consumers_.erase(it);\r
}\r
- });\r
+ }\r
}\r
\r
- void execute(const safe_ptr<read_frame>& frame)\r
- { \r
- try\r
- { \r
- if(!has_synchronization_clock())\r
- timer_.tick(1.0/channel_.get_format_desc().fps);\r
- \r
- auto fill = frame;\r
- auto key = make_safe<key_read_frame_muxer>(channel_.ogl(), frame);\r
+ void execute(const safe_ptr<message<safe_ptr<read_frame>>>& msg)\r
+ { \r
+ auto frame = msg->value();\r
\r
- auto it = consumers_.begin();\r
- while(it != consumers_.end())\r
- {\r
+ {\r
+ critical_section::scoped_lock lock(mutex_); \r
+\r
+ if(!has_synchronization_clock() || frame->image_size() != format_desc_.size)\r
+ { \r
+ scoped_oversubcription_token oversubscribe;\r
+ timer_.tick(1.0/format_desc_.fps);\r
+ }\r
+ \r
+ std::vector<int> removables; \r
+ Concurrency::parallel_for_each(consumers_.begin(), consumers_.end(), [&](const decltype(*consumers_.begin())& pair)\r
+ { \r
try\r
{\r
- auto consumer = it->second;\r
-\r
- if(consumer->get_video_format_desc() != channel_.get_format_desc())\r
- consumer->initialize(channel_.get_format_desc());\r
-\r
- auto frame = consumer->key_only() ? key : fill;\r
-\r
- if(frame->image_size() == consumer->get_video_format_desc().size)\r
- consumer->send(frame);\r
-\r
- ++it;\r
+ if(!pair.second->send(frame))\r
+ removables.push_back(pair.first);\r
}\r
catch(...)\r
- {\r
+ { \r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(error) << print() << L" " << it->second->print() << L" Removed.";\r
- consumers_.erase(it++);\r
+ CASPAR_LOG(error) << "Consumer error. Trying to recover:" << pair.second->print();\r
+ try\r
+ {\r
+ pair.second->initialize(format_desc_);\r
+ pair.second->send(frame);\r
+ }\r
+ catch(...)\r
+ {\r
+ removables.push_back(pair.first); \r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ CASPAR_LOG(error) << "Failed to recover consumer: " << pair.second->print() << L". Removing it.";\r
+ }\r
}\r
- }\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
+ });\r
+\r
+ BOOST_FOREACH(auto& removable, removables)\r
+ consumers_.erase(removable); \r
}\r
}\r
\r
}\r
};\r
\r
-output::output(video_channel_context& video_channel) : impl_(new implementation(video_channel)){}\r
+output::output(output::source_t& source, const video_format_desc& format_desc) : impl_(new implementation(source, format_desc)){}\r
void output::add(int index, safe_ptr<frame_consumer>&& consumer){impl_->add(index, std::move(consumer));}\r
void output::remove(int index){impl_->remove(index);}\r
-void output::execute(const safe_ptr<read_frame>& frame) {impl_->execute(frame); }\r
}}
\ No newline at end of file