[&]{image = mix_image(frames);}, \r
[&]{audio = mix_audio(frames);});\r
\r
- return make_safe<read_frame>(channel_.ogl(), std::move(image.get()), std::move(audio));\r
+ return make_safe<read_frame>(channel_.ogl(), channel_.get_format_desc().size, std::move(image), std::move(audio));\r
}\r
catch(...)\r
{\r
#include "gpu/host_buffer.h" \r
#include "gpu/ogl_device.h"\r
\r
+#include <tbb/mutex.h>\r
+\r
namespace caspar { namespace core {\r
\r
struct read_frame::implementation : boost::noncopyable\r
{\r
- ogl_device& ogl_;\r
- safe_ptr<host_buffer> image_data_;\r
- std::vector<int16_t> audio_data_;\r
+ ogl_device& ogl_;\r
+ size_t size_;\r
+ boost::unique_future<safe_ptr<host_buffer>> future_image_data_;\r
+ std::shared_ptr<host_buffer> image_data_;\r
+ tbb::mutex mutex_;\r
+ std::vector<int16_t> audio_data_;\r
\r
public:\r
- implementation(ogl_device& ogl, safe_ptr<host_buffer>&& image_data, std::vector<int16_t>&& audio_data) \r
+ implementation(ogl_device& ogl, size_t size, boost::unique_future<safe_ptr<host_buffer>>&& image_data, std::vector<int16_t>&& audio_data) \r
: ogl_(ogl)\r
- , image_data_(std::move(image_data))\r
+ , size_(size)\r
+ , future_image_data_(std::move(image_data))\r
, audio_data_(std::move(audio_data)){} \r
\r
const boost::iterator_range<const uint8_t*> image_data()\r
{\r
- if(!image_data_->data())\r
{\r
- image_data_->wait(ogl_);\r
- ogl_.invoke([=]{image_data_->map();}, high_priority);\r
+ tbb::mutex::scoped_lock lock(mutex_);\r
+\r
+ if(!image_data_)\r
+ {\r
+ image_data_ = future_image_data_.get();\r
+ image_data_.get()->wait(ogl_);\r
+ ogl_.invoke([=]{image_data_.get()->map();}, high_priority);\r
+ }\r
}\r
\r
auto ptr = static_cast<const uint8_t*>(image_data_->data());\r
}\r
};\r
\r
-read_frame::read_frame(ogl_device& ogl, safe_ptr<host_buffer>&& image_data, std::vector<int16_t>&& audio_data) \r
- : impl_(new implementation(ogl, std::move(image_data), std::move(audio_data))){}\r
+read_frame::read_frame(ogl_device& ogl, size_t size, boost::unique_future<safe_ptr<host_buffer>>&& image_data, std::vector<int16_t>&& audio_data) \r
+ : impl_(new implementation(ogl, size, std::move(image_data), std::move(audio_data))){}\r
read_frame::read_frame(){}\r
const boost::iterator_range<const uint8_t*> read_frame::image_data()\r
{\r
return impl_ ? impl_->audio_data() : boost::iterator_range<const int16_t*>();\r
}\r
\r
-size_t read_frame::image_size() const{return impl_ ? impl_->image_data_->size() : 0;}\r
+size_t read_frame::image_size() const{return impl_ ? impl_->size_ : 0;}\r
\r
//#include <tbb/scalable_allocator.h>\r
//#include <tbb/parallel_for.h>\r
\r
#include <boost/noncopyable.hpp>\r
#include <boost/range/iterator_range.hpp>\r
+#include <boost/thread/future.hpp>\r
\r
#include <cstdint>\r
#include <memory>\r
{\r
public:\r
read_frame();\r
- read_frame(ogl_device& ogl, safe_ptr<host_buffer>&& image_data, std::vector<int16_t>&& audio_data);\r
+ read_frame(ogl_device& ogl, size_t size, boost::unique_future<safe_ptr<host_buffer>>&& image_data, std::vector<int16_t>&& audio_data);\r
\r
virtual const boost::iterator_range<const uint8_t*> image_data();\r
virtual const boost::iterator_range<const int16_t*> audio_data();\r
\r
#include <core/producer/frame/basic_frame.h>\r
\r
+#include <tbb/parallel_invoke.h>\r
+\r
namespace caspar { namespace core { \r
\r
struct separated_producer : public frame_producer\r
\r
virtual safe_ptr<basic_frame> receive(int hints)\r
{\r
- if(fill_ == core::basic_frame::late())\r
- fill_ = receive_and_follow(fill_producer_, hints);\r
- \r
- if(key_ == core::basic_frame::late())\r
- key_ = receive_and_follow(key_producer_, hints | ALPHA_HINT);\r
+ tbb::parallel_invoke\r
+ (\r
+ [&]\r
+ {\r
+ if(fill_ == core::basic_frame::late())\r
+ fill_ = receive_and_follow(fill_producer_, hints);\r
+ },\r
+ [&]\r
+ {\r
+ if(key_ == core::basic_frame::late())\r
+ key_ = receive_and_follow(key_producer_, hints | ALPHA_HINT);\r
+ }\r
+ );\r
\r
if(fill_ == basic_frame::eof())\r
return basic_frame::eof();\r
\r
#include <boost/foreach.hpp>\r
\r
+#include <tbb/parallel_for_each.h>\r
+\r
#include <map>\r
#include <set>\r
\r
\r
struct stage::implementation : boost::noncopyable\r
{ \r
- std::map<int, layer> layers_; \r
+ std::map<int, layer> layers_; \r
video_channel_context& channel_;\r
public:\r
implementation(video_channel_context& video_channel) \r
\r
try\r
{\r
- BOOST_FOREACH(auto& layer, layers_)\r
+ // Allocate placeholders.\r
+ BOOST_FOREACH(auto layer, layers_)\r
+ frames[layer.first] = basic_frame::empty();\r
+\r
+ // Render layers\r
+ tbb::parallel_for_each(layers_.begin(), layers_.end(), [&](std::map<int, layer>::value_type& layer)\r
+ {\r
frames[layer.first] = layer.second.receive();\r
+ });\r
}\r
catch(...)\r
{\r
#include <core/producer/frame/image_transform.h>\r
#include <core/producer/frame/audio_transform.h>\r
\r
+#include <tbb/parallel_invoke.h>\r
+\r
namespace caspar { namespace core { \r
\r
struct transition_producer : public frame_producer\r
if(current_frame_++ >= info_.duration)\r
return basic_frame::eof();\r
\r
- auto dest = receive_and_follow(dest_producer_, hints);\r
- if(dest == core::basic_frame::late())\r
- dest = dest_producer_->last_frame();\r
- \r
- auto source = receive_and_follow(source_producer_, hints);\r
- if(source == core::basic_frame::late())\r
- source = source_producer_->last_frame();\r
+ auto dest = core::basic_frame::empty();\r
+ auto source = core::basic_frame::empty();\r
+\r
+ tbb::parallel_invoke\r
+ (\r
+ [&]\r
+ {\r
+ dest = receive_and_follow(dest_producer_, hints);\r
+ if(dest == core::basic_frame::late())\r
+ dest = dest_producer_->last_frame();\r
+ },\r
+ [&]\r
+ {\r
+ source = receive_and_follow(source_producer_, hints);\r
+ if(source == core::basic_frame::late())\r
+ source = source_producer_->last_frame();\r
+ }\r
+ );\r
\r
return last_frame_ = compose(dest, source);\r
}\r
\r
safe_ptr<core::basic_frame> last_frame_;\r
\r
- tbb::task_group tasks_;\r
-\r
public:\r
explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, int length) \r
: filename_(filename)\r
for(int n = 0; n < 128 && muxer_.size() < 2; ++n)\r
decode_frame(0);\r
}\r
-\r
- ~ffmpeg_producer()\r
- {\r
- tasks_.cancel();\r
- tasks_.wait();\r
- }\r
- \r
+ \r
virtual safe_ptr<core::basic_frame> receive(int hints)\r
{\r
- tasks_.wait();\r
-\r
auto frame = core::basic_frame::late();\r
\r
+ frame_timer_.restart();\r
+\r
+ for(int n = 0; n < 64 && muxer_.size() < 2; ++n)\r
+ decode_frame(hints);\r
+ \r
if(!muxer_.empty())\r
frame = last_frame_ = muxer_.pop(); \r
else\r
++nb_frames_; \r
}\r
}\r
-\r
- tasks_.run([=]\r
- {\r
- frame_timer_.restart();\r
-\r
- for(int n = 0; n < 64 && muxer_.size() < 2; ++n)\r
- decode_frame(hints);\r
-\r
- graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
- });\r
+ \r
+ graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
\r
return frame;\r
}\r