]> git.sesse.net Git - casparcg/blobdiff - core/consumer/output.cpp
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / core / consumer / output.cpp
index e30382a97c6040832d5f92d92131c76c37338dcb..1beb69abae01ec56e6671dd62808b6565700b9cb 100644 (file)
@@ -17,6 +17,7 @@
 *    along with CasparCG.  If not, see <http://www.gnu.org/licenses/>.\r
 *\r
 */\r
+// TODO: Try to recover consumer from bad_alloc...\r
 #include "../StdAfx.h"\r
 \r
 #ifdef _MSC_VER\r
@@ -25,8 +26,6 @@
 \r
 #include "output.h"\r
 \r
-#include "../video_channel_context.h"\r
-\r
 #include "../video_format.h"\r
 #include "../mixer/gpu/ogl_device.h"\r
 #include "../mixer/read_frame.h"\r
 #include <common/utility/timer.h>\r
 #include <common/memory/memshfl.h>\r
 \r
-#include <tbb/mutex.h>\r
-\r
-namespace caspar { namespace core {\r
-\r
-class key_read_frame_muxer : public core::read_frame\r
-{\r
-       ogl_device&                                              ogl_;\r
-       safe_ptr<read_frame>                     fill_;\r
-       std::shared_ptr<host_buffer>     key_;\r
-       tbb::mutex                                           mutex_;\r
-public:\r
-       key_read_frame_muxer(ogl_device& ogl, const safe_ptr<read_frame>& fill)\r
-               : ogl_(ogl)\r
-               , fill_(fill)\r
-       {\r
-       }\r
-\r
-       virtual const boost::iterator_range<const uint8_t*> image_data()\r
-       {\r
-               tbb::mutex::scoped_lock lock(mutex_);\r
-               if(!key_)\r
-               {\r
-                       key_ = ogl_.create_host_buffer(fill_->image_data().size(), host_buffer::write_only);                            \r
-                       fast_memsfhl(key_->data(), fill_->image_data().begin(), fill_->image_data().size(), 0x0F0F0F0F, 0x0B0B0B0B, 0x07070707, 0x03030303);\r
-               }\r
+#include <concrt_extras.h>\r
 \r
-               auto ptr = static_cast<const uint8_t*>(key_->data());\r
-               return boost::iterator_range<const uint8_t*>(ptr, ptr + key_->size());\r
-       }\r
+using namespace Concurrency;\r
 \r
-       virtual const boost::iterator_range<const int16_t*> audio_data()\r
-       {\r
-               return fill_->audio_data();\r
-       }       \r
-};\r
+namespace caspar { namespace core {\r
        \r
 struct output::implementation\r
 {      \r
        typedef std::pair<safe_ptr<read_frame>, safe_ptr<read_frame>> fill_and_key;\r
        \r
-       video_channel_context& channel_;\r
+       const video_format_desc format_desc_;\r
 \r
        std::map<int, safe_ptr<frame_consumer>> consumers_;\r
        typedef std::map<int, safe_ptr<frame_consumer>>::value_type layer_t;\r
        \r
        high_prec_timer timer_;\r
+\r
+       critical_section                                                                mutex_;\r
+       call<safe_ptr<message<safe_ptr<read_frame>>>>   output_;\r
                \r
 public:\r
-       implementation(video_channel_context& video_channel) \r
-               : channel_(video_channel){}     \r
+       implementation(output::source_t& source, const video_format_desc& format_desc) \r
+               : format_desc_(format_desc)\r
+               , output_(std::bind(&implementation::execute, this, std::placeholders::_1))\r
+       {\r
+               source.link_target(&output_);\r
+       }       \r
        \r
        void add(int index, safe_ptr<frame_consumer>&& consumer)\r
        {               \r
-               consumer->initialize(channel_.get_format_desc());\r
-               channel_.execution().invoke([&]\r
+               remove(index);\r
+               wait(100); // Wait a bit to allow asynchronous destruction to finish.\r
+\r
+               consumer->initialize(format_desc_);             \r
+               \r
                {\r
-                       consumers_.erase(index);\r
+                       critical_section::scoped_lock lock(mutex_);\r
+\r
                        consumers_.insert(std::make_pair(index, consumer));\r
 \r
-                       CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added.";\r
-               });\r
+                       CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added."; \r
+               }\r
        }\r
 \r
        void remove(int index)\r
        {\r
-               channel_.execution().invoke([&]\r
                {\r
+                       critical_section::scoped_lock lock(mutex_);\r
+\r
                        auto it = consumers_.find(index);\r
                        if(it != consumers_.end())\r
                        {\r
                                CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
                                consumers_.erase(it);\r
                        }\r
-               });\r
+               }\r
        }\r
                                                \r
-       void execute(const safe_ptr<read_frame>& frame)\r
-       {               \r
-               try\r
-               {               \r
-                       if(!has_synchronization_clock())\r
-                               timer_.tick(1.0/channel_.get_format_desc().fps);\r
-                                               \r
-                       auto fill = frame;\r
-                       auto key = make_safe<key_read_frame_muxer>(channel_.ogl(), frame);\r
+       void execute(const safe_ptr<message<safe_ptr<read_frame>>>& msg)\r
+       {       \r
+               auto frame = msg->value();\r
 \r
-                       auto it = consumers_.begin();\r
-                       while(it != consumers_.end())\r
-                       {\r
+               {\r
+                       critical_section::scoped_lock lock(mutex_);             \r
+\r
+                       if(!has_synchronization_clock() || frame->image_size() != format_desc_.size)\r
+                       {               \r
+                               scoped_oversubcription_token oversubscribe;\r
+                               timer_.tick(1.0/format_desc_.fps);\r
+                       }\r
+       \r
+                       std::vector<int> removables;            \r
+                       Concurrency::parallel_for_each(consumers_.begin(), consumers_.end(), [&](const decltype(*consumers_.begin())& pair)\r
+                       {               \r
                                try\r
                                {\r
-                                       auto consumer = it->second;\r
-\r
-                                       if(consumer->get_video_format_desc() != channel_.get_format_desc())\r
-                                               consumer->initialize(channel_.get_format_desc());\r
-\r
-                                       auto frame = consumer->key_only() ? key : fill;\r
-\r
-                                       if(static_cast<size_t>(frame->image_data().size()) == consumer->get_video_format_desc().size)\r
-                                               consumer->send(frame);\r
-\r
-                                       ++it;\r
+                                       if(!pair.second->send(frame))\r
+                                               removables.push_back(pair.first);\r
                                }\r
                                catch(...)\r
-                               {\r
+                               {               \r
                                        CASPAR_LOG_CURRENT_EXCEPTION();\r
-                                       CASPAR_LOG(error) << print() << L" " << it->second->print() << L" Removed.";\r
-                                       consumers_.erase(it++);\r
+                                       CASPAR_LOG(error) << "Consumer error. Trying to recover:" << pair.second->print();\r
+                                       try\r
+                                       {\r
+                                               pair.second->initialize(format_desc_);\r
+                                               pair.second->send(frame);\r
+                                       }\r
+                                       catch(...)\r
+                                       {\r
+                                               removables.push_back(pair.first);                               \r
+                                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                                               CASPAR_LOG(error) << "Failed to recover consumer: " << pair.second->print() << L". Removing it.";\r
+                                       }\r
                                }\r
-                       }\r
-               }\r
-               catch(...)\r
-               {\r
-                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+                       });\r
+\r
+                       BOOST_FOREACH(auto& removable, removables)\r
+                               consumers_.erase(removable);            \r
                }\r
        }\r
 \r
@@ -169,8 +153,7 @@ private:
        }\r
 };\r
 \r
-output::output(video_channel_context& video_channel) : impl_(new implementation(video_channel)){}\r
+output::output(output::source_t& source, const video_format_desc& format_desc) : impl_(new implementation(source, format_desc)){}\r
 void output::add(int index, safe_ptr<frame_consumer>&& consumer){impl_->add(index, std::move(consumer));}\r
 void output::remove(int index){impl_->remove(index);}\r
-void output::execute(const safe_ptr<read_frame>& frame) {impl_->execute(frame); }\r
 }}
\ No newline at end of file