\r
#include <boost/foreach.hpp>\r
\r
-#include <tbb/parallel_for_each.h>\r
-\r
#include <map>\r
#include <set>\r
\r
\r
try\r
{\r
- // Allocate placeholders.\r
- BOOST_FOREACH(auto layer, layers_)\r
- frames[layer.first] = basic_frame::empty();\r
-\r
- // Render layers\r
- tbb::parallel_for_each(layers_.begin(), layers_.end(), [&](std::map<int, layer>::value_type& layer)\r
- {\r
+ BOOST_FOREACH(auto& layer, layers_)\r
frames[layer.first] = layer.second.receive();\r
- });\r
}\r
catch(...)\r
{\r
else\r
muxer_.push(std::make_shared<std::vector<int16_t>>(frame_factory_->get_video_format_desc().audio_samples_per_frame, 0));\r
\r
+ muxer_.commit();\r
+\r
while(!muxer_.empty())\r
{\r
if(!frame_buffer_.try_push(muxer_.pop()))\r
\r
#include <tbb/atomic.h>\r
#include <tbb/concurrent_queue.h>\r
-#include <tbb/parallel_invoke.h>\r
#include <tbb/parallel_for.h>\r
\r
#include <boost/assign.hpp>\r
\r
std::vector<std::shared_ptr<std::vector<int16_t>>> result;\r
\r
- if(packets_.empty())\r
- return result;\r
- \r
- auto packet = packets_.front();\r
-\r
- if(packet) \r
- {\r
- result.push_back(decode(*packet));\r
- if(packet->size == 0) \r
+ while(!packets_.empty() && result.empty())\r
+ { \r
+ auto packet = packets_.front();\r
+\r
+ if(packet) \r
+ {\r
+ result.push_back(decode(*packet));\r
+ if(packet->size == 0) \r
+ packets_.pop();\r
+ }\r
+ else \r
+ { \r
+ avcodec_flush_buffers(codec_context_.get());\r
+ result.push_back(nullptr);\r
packets_.pop();\r
+ } \r
}\r
- else \r
- { \r
- avcodec_flush_buffers(codec_context_.get());\r
- result.push_back(nullptr);\r
- packets_.pop();\r
- } \r
\r
return result;\r
}\r
\r
bool ready() const\r
{\r
- return !codec_context_ || !packets_.empty();\r
+ return !codec_context_ || packets_.size() > 2;\r
}\r
};\r
\r
#include <boost/range/algorithm/find.hpp>\r
\r
#include <tbb/task_group.h>\r
-#include <tbb/parallel_invoke.h>\r
\r
namespace caspar {\r
\r
\r
const safe_ptr<diagnostics::graph> graph_;\r
boost::timer frame_timer_;\r
+ boost::timer video_timer_;\r
+ boost::timer audio_timer_;\r
\r
const safe_ptr<core::frame_factory> frame_factory_;\r
const core::video_format_desc format_desc_;\r
const bool loop_;\r
\r
safe_ptr<core::basic_frame> last_frame_;\r
+\r
+ tbb::task_group tasks_;\r
\r
public:\r
explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, int length) \r
{\r
graph_->add_guide("frame-time", 0.5);\r
graph_->set_color("frame-time", diagnostics::color(1.0f, 0.0f, 0.0f));\r
+ graph_->set_color("video-time", diagnostics::color(1.0f, 1.0f, 0.0f));\r
+ graph_->set_color("audio-time", diagnostics::color(0.2f, 1.0f, 0.2f));\r
graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f)); \r
\r
for(int n = 0; n < 128 && muxer_.size() < 2; ++n)\r
decode_frame(0);\r
}\r
+\r
+ ~ffmpeg_producer()\r
+ {\r
+ tasks_.cancel();\r
+ tasks_.wait();\r
+ }\r
\r
virtual safe_ptr<core::basic_frame> receive(int hints)\r
{\r
auto frame = core::basic_frame::late();\r
\r
frame_timer_.restart();\r
-\r
- for(int n = 0; n < 64 && muxer_.size() < 2; ++n)\r
- decode_frame(hints);\r
\r
+ for(int n = 0; n < 8 && muxer_.empty(); ++n)\r
+ decode_frame(hints);\r
+\r
if(!muxer_.empty())\r
frame = last_frame_ = muxer_.pop(); \r
else\r
\r
void decode_frame(int hints)\r
{\r
- for(int n = 0; n < 32 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) \r
+ tasks_.wait();\r
+\r
+ muxer_.commit();\r
+\r
+ for(int n = 0; n < 16 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) \r
{\r
std::shared_ptr<AVPacket> pkt;\r
if(input_.try_pop(pkt))\r
audio_decoder_.push(pkt);\r
}\r
}\r
-\r
- decltype(video_decoder_.poll()) video_frames;\r
- decltype(audio_decoder_.poll()) audio_samples;\r
\r
- tbb::parallel_invoke(\r
- [&]\r
+ if(!muxer_.video_ready())\r
{\r
- if(!muxer_.video_ready())\r
- video_frames = video_decoder_.poll();\r
- },\r
- [&]\r
+ tasks_.run([=]\r
+ {\r
+ video_timer_.restart();\r
+\r
+ auto video_frames = video_decoder_.poll();\r
+ BOOST_FOREACH(auto& video, video_frames) \r
+ muxer_.push(video, hints); \r
+\r
+ graph_->update_value("video-time", static_cast<float>(video_timer_.elapsed()*format_desc_.fps*0.5));\r
+ });\r
+ } \r
+\r
+ if(!muxer_.audio_ready())\r
{\r
- if(!muxer_.audio_ready())\r
- audio_samples = audio_decoder_.poll();\r
- });\r
- \r
- BOOST_FOREACH(auto& audio, audio_samples)\r
- muxer_.push(audio);\r
+ tasks_.run([=]\r
+ {\r
+ audio_timer_.restart();\r
+ \r
+ auto audio_samples = audio_decoder_.poll();\r
+ BOOST_FOREACH(auto& audio, audio_samples)\r
+ muxer_.push(audio); \r
\r
- BOOST_FOREACH(auto& video, video_frames) \r
- muxer_.push(video, hints); \r
+ graph_->update_value("audio-time", static_cast<float>(audio_timer_.elapsed()*format_desc_.fps*0.5));\r
+ }); \r
+ }\r
}\r
\r
virtual int64_t nb_frames() const\r
video_streams_.back().push(make_safe<core::write_frame>(this));\r
++video_frame_count_;\r
display_mode_ = display_mode::simple;\r
- put_frames(frame_buffer_);\r
return;\r
}\r
\r
\r
video_streams_.back().push(frame);\r
++video_frame_count_;\r
-\r
- put_frames(frame_buffer_);\r
}\r
}\r
\r
audio_sample_count_ += audio_samples->size();\r
\r
boost::range::push_back(audio_streams_.back(), *audio_samples);\r
- put_frames(frame_buffer_);\r
}\r
\r
safe_ptr<basic_frame> pop()\r
return audio_streams_.back().size() / format_desc_.audio_samples_per_frame;\r
}\r
\r
- void put_frames(std::deque<safe_ptr<basic_frame>>& dest)\r
+ void commit()\r
{\r
if(video_streams_.size() > 1 && audio_streams_.size() > 1 &&\r
(video_streams_.front().empty() || audio_streams_.front().empty()))\r
\r
switch(display_mode_)\r
{\r
- case display_mode::simple: return simple(dest);\r
- case display_mode::duplicate: return duplicate(dest);\r
- case display_mode::half: return half(dest);\r
- case display_mode::interlace: return interlace(dest);\r
- case display_mode::deinterlace_bob: return simple(dest);\r
- case display_mode::deinterlace_bob_reinterlace: return interlace(dest);\r
- case display_mode::deinterlace: return simple(dest);\r
+ case display_mode::simple: return simple(frame_buffer_);\r
+ case display_mode::duplicate: return duplicate(frame_buffer_);\r
+ case display_mode::half: return half(frame_buffer_);\r
+ case display_mode::interlace: return interlace(frame_buffer_);\r
+ case display_mode::deinterlace_bob: return simple(frame_buffer_);\r
+ case display_mode::deinterlace_bob_reinterlace: return interlace(frame_buffer_);\r
+ case display_mode::deinterlace: return simple(frame_buffer_);\r
default: BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
}\r
}\r
: impl_(new implementation(in_fps, frame_factory)){}\r
void frame_muxer::push(const std::shared_ptr<AVFrame>& video_frame, int hints){impl_->push(video_frame, hints);}\r
void frame_muxer::push(const std::shared_ptr<std::vector<int16_t>>& audio_samples){return impl_->push(audio_samples);}\r
+void frame_muxer::commit(){impl_->commit();}\r
safe_ptr<basic_frame> frame_muxer::pop(){return impl_->pop();}\r
size_t frame_muxer::size() const {return impl_->size();}\r
bool frame_muxer::empty() const {return impl_->size() == 0;}\r
void push(const std::shared_ptr<AVFrame>& video_frame, int hints = 0);\r
void push(const std::shared_ptr<std::vector<int16_t>>& audio_samples);\r
\r
+ void commit();\r
+\r
bool video_ready() const;\r
bool audio_ready() const;\r
\r
#include <core/mixer/write_frame.h>\r
\r
#include <common/exception/exceptions.h>\r
-#include <common/memory/memcpy.h>\r
+\r
+#include <tbb/parallel_for.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
auto write = frame_factory->create_frame(tag, desc.pix_fmt != core::pixel_format::invalid ? desc : get_pixel_format_desc(PIX_FMT_BGRA, width, height));\r
write->set_type(get_mode(*decoded_frame));\r
\r
- tbb::parallel_for(0, static_cast<int>(desc.planes.size()), 1, [&](int n)\r
+ for(int n = 0; n < static_cast<int>(desc.planes.size()); ++n)\r
{\r
auto plane = desc.planes[n];\r
auto result = write->image_data(n).begin();\r
auto decoded_linesize = decoded_frame->linesize[n];\r
\r
// Copy line by line since ffmpeg sometimes pads each line.\r
+ tbb::affinity_partitioner ap;\r
tbb::parallel_for(tbb::blocked_range<size_t>(0, static_cast<int>(desc.planes[n].height)), [&](const tbb::blocked_range<size_t>& r)\r
{\r
for(size_t y = r.begin(); y != r.end(); ++y)\r
- fast_memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize);\r
- });\r
+ memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize);\r
+ }, ap);\r
\r
write->commit(n);\r
- });\r
+ }\r
+\r
+ //for(int n = 0; n < static_cast<int>(desc.planes.size()); ++n)\r
+ //{\r
+ // auto plane = desc.planes[n];\r
+ // auto result = write->image_data(n).begin();\r
+ // auto decoded = decoded_frame->data[n];\r
+ // auto decoded_linesize = decoded_frame->linesize[n];\r
+ // \r
+ // for(size_t y = 0; y < static_cast<int>(desc.planes[n].height); ++y)\r
+ // fast_memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize);\r
+\r
+ // write->commit(n);\r
+ //}\r
\r
return write;\r
}\r
\r
std::vector<std::shared_ptr<AVFrame>> result;\r
\r
- if(packets_.empty())\r
- return result;\r
-\r
- auto packet = packets_.front();\r
- \r
- if(packet)\r
+ while(!packets_.empty() && result.empty())\r
{\r
- auto frame = decode(*packet);\r
- boost::range::push_back(result, filter_.execute(frame));\r
- if(packet->size == 0)\r
- packets_.pop();\r
- }\r
- else\r
- {\r
- if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+ auto packet = packets_.front();\r
+ \r
+ if(packet)\r
{\r
- AVPacket pkt;\r
- av_init_packet(&pkt);\r
- pkt.data = nullptr;\r
- pkt.size = 0;\r
- auto frame = decode(pkt);\r
- boost::range::push_back(result, filter_.execute(frame)); \r
+ auto frame = decode(*packet);\r
+ boost::range::push_back(result, filter_.execute(frame));\r
+ if(packet->size == 0)\r
+ packets_.pop();\r
}\r
-\r
- if(result.empty())\r
- { \r
- packets_.pop();\r
- avcodec_flush_buffers(codec_context_.get());\r
- result.push_back(nullptr);\r
+ else\r
+ {\r
+ if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+ {\r
+ AVPacket pkt;\r
+ av_init_packet(&pkt);\r
+ pkt.data = nullptr;\r
+ pkt.size = 0;\r
+ auto frame = decode(pkt);\r
+ boost::range::push_back(result, filter_.execute(frame)); \r
+ }\r
+\r
+ if(result.empty())\r
+ { \r
+ packets_.pop();\r
+ avcodec_flush_buffers(codec_context_.get());\r
+ result.push_back(nullptr);\r
+ }\r
}\r
}\r
\r
\r
bool ready() const\r
{\r
- return !codec_context_ || !packets_.empty();\r
+ return !codec_context_ || packets_.size() > 2;\r
}\r
\r
double fps() const\r