, device_index_(device_index)\r
, frame_factory_(frame_factory)\r
, tail_(core::basic_frame::empty())\r
- , filter_(filter)\r
+ , filter_(filter, filter::low_latency)\r
, muxer_(double_rate(filter) ? format_desc.fps * 2.0 : format_desc.fps, frame_factory->get_video_format_desc(), frame_factory)\r
{\r
frame_buffer_.set_capacity(2);\r
av_frame->interlaced_frame = format_desc_.mode != core::video_mode::progressive;\r
av_frame->top_field_first = format_desc_.mode == core::video_mode::upper ? 1 : 0;\r
\r
- filter_.push(av_frame);\r
- BOOST_FOREACH(auto& av_frame2, filter_.poll())\r
+ BOOST_FOREACH(auto& av_frame2, filter_.execute(av_frame))\r
muxer_.push(make_write_frame(this, av_frame2, frame_factory_)); \r
\r
// It is assumed that audio is always equal or ahead of video.\r
\r
#include <boost/circular_buffer.hpp>\r
\r
+#include <tbb/task_group.h>\r
+\r
#include <cstdio>\r
#include <sstream>\r
\r
std::shared_ptr<AVFilterGraph> graph_; \r
AVFilterContext* buffersink_ctx_;\r
AVFilterContext* buffersrc_ctx_;\r
+\r
+ filter::flags flags_;\r
+ std::vector<safe_ptr<AVFrame>> frames_;\r
+ tbb::task_group tasks_;\r
\r
- implementation(const std::wstring& filters) \r
+ implementation(const std::wstring& filters, filter::flags filter_flags) \r
: filters_(filters.empty() ? "null" : narrow(filters))\r
+ , flags_(filter_flags)\r
{\r
std::transform(filters_.begin(), filters_.end(), filters_.begin(), ::tolower);\r
}\r
\r
+ std::vector<safe_ptr<AVFrame>> execute(const std::shared_ptr<AVFrame>& frame)\r
+ {\r
+ if((flags_ | filter::low_latency) != 0)\r
+ {\r
+ push(frame);\r
+ return poll();\r
+ }\r
+\r
+ tasks_.wait();\r
+ \r
+ push(frame);\r
+\r
+ auto frames = std::move(frames_);\r
+\r
+ tasks_.run([=]\r
+ {\r
+ frames_ = poll();\r
+ });\r
+\r
+ return frames;\r
+ }\r
+\r
void push(const std::shared_ptr<AVFrame>& frame)\r
{ \r
+ if(!frame)\r
+ return;\r
+\r
int errn = 0; \r
\r
if(!graph_)\r
}\r
};\r
\r
-filter::filter(const std::wstring& filters) : impl_(new implementation(filters)){}\r
-void filter::push(const std::shared_ptr<AVFrame>& frame) {impl_->push(frame);}\r
-std::vector<safe_ptr<AVFrame>> filter::poll() {return impl_->poll();}\r
+filter::filter(const std::wstring& filters, flags filter_flags) : impl_(new implementation(filters, filter_flags)){}\r
+std::vector<safe_ptr<AVFrame>> filter::execute(const std::shared_ptr<AVFrame>& frame) {return impl_->execute(frame);}\r
}
\ No newline at end of file
class filter\r
{\r
public:\r
- filter(const std::wstring& filters);\r
+ enum flags\r
+ {\r
+ none = 0,\r
+ low_latency = 2\r
+ };\r
\r
- void push(const std::shared_ptr<AVFrame>& frame);\r
- std::vector<safe_ptr<AVFrame>> poll();\r
+ filter(const std::wstring& filters, flags filter_flags = none);\r
+\r
+ std::vector<safe_ptr<AVFrame>> execute(const std::shared_ptr<AVFrame>& frame);\r
\r
private:\r
struct implementation;\r
\r
auto frame = pop_video();\r
\r
- filter_->push(as_av_frame(frame));\r
- auto av_frames = filter_->poll();\r
+ auto av_frames = filter_->execute(as_av_frame(frame));\r
\r
if(av_frames.size() < 2)\r
return;\r
filter_.reset(new filter(L"YADIF=0:-1"));\r
\r
auto frame = pop_video();\r
-\r
- filter_->push(as_av_frame(frame));\r
- auto av_frames = filter_->poll();\r
+ \r
+ auto av_frames = filter_->execute(as_av_frame(frame));\r
\r
if(av_frames.empty())\r
return;\r
result.push_back(make_safe<core::write_frame>(this));\r
else if(!packet_buffer_.empty())\r
{\r
- std::vector<safe_ptr<AVFrame>> av_frames;\r
+ std::shared_ptr<AVFrame> frame;\r
\r
auto packet = std::move(packet_buffer_.front());\r
\r
if(packet)\r
{\r
- decode(*packet, av_frames); \r
+ frame = decode(*packet); \r
packet_buffer_.pop();\r
}\r
else\r
{\r
- bool flush = true;\r
-\r
if(codec_context_->codec->capabilities | CODEC_CAP_DELAY)\r
{\r
AVPacket pkt = {0};\r
- flush = !decode(pkt, av_frames);\r
+ frame = decode(pkt);\r
}\r
\r
- if(flush)\r
+ if(!frame)\r
{ \r
packet_buffer_.pop();\r
avcodec_flush_buffers(codec_context_.get());\r
}\r
}\r
\r
- if(filter_)\r
- {\r
- BOOST_FOREACH(auto& frame, av_frames) \r
- filter_->push(frame);\r
- \r
- av_frames = filter_->poll();\r
- }\r
+ std::vector<safe_ptr<AVFrame>> av_frames;\r
+\r
+ if(filter_) \r
+ av_frames = filter_->execute(frame); \r
+ else if(frame)\r
+ av_frames.push_back(make_safe(frame));\r
\r
BOOST_FOREACH(auto& frame, av_frames)\r
result.push_back(make_write_frame(this, frame, frame_factory_));\r
return result;\r
}\r
\r
- bool decode(AVPacket& packet, std::vector<safe_ptr<AVFrame>>& av_frames)\r
+ std::shared_ptr<AVFrame> decode(AVPacket& packet)\r
{\r
std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), av_free);\r
\r
\r
if(frame_finished != 0) \r
{\r
- av_frames.push_back(make_safe(decoded_frame));\r
+ return decoded_frame;\r
if(decoded_frame->repeat_pict != 0)\r
CASPAR_LOG(warning) << "video_decoder: repeat_pict not implemented.";\r
}\r
\r
- return frame_finished != 0;\r
+ return nullptr;\r
}\r
\r
bool ready() const\r