}\r
\r
#pragma warning(pop)\r
+\r
+#undef Yield
\ No newline at end of file
#pragma warning (pop)\r
#endif\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace ffmpeg {\r
\r
struct audio_decoder::implementation : boost::noncopyable\r
{ \r
+ audio_decoder::source_t& source_;\r
int index_;\r
- std::shared_ptr<AVCodecContext> codec_context_; \r
+ const safe_ptr<AVCodecContext> codec_context_; \r
const core::video_format_desc format_desc_;\r
audio_resampler resampler_;\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- std::queue<std::shared_ptr<AVPacket>> packets_;\r
+ std::queue<safe_ptr<AVPacket>> packets_;\r
public:\r
- explicit implementation(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) \r
- : codec_context_(open_codec(*context, AVMEDIA_TYPE_AUDIO, index_))\r
+ explicit implementation(audio_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) \r
+ : source_(source)\r
+ , codec_context_(open_codec(*context, AVMEDIA_TYPE_AUDIO, index_))\r
, format_desc_(format_desc) \r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
, resampler_(format_desc_.audio_channels, codec_context_->channels,\r
AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
{ \r
}\r
-\r
- void push(const std::shared_ptr<AVPacket>& packet)\r
- { \r
- if(packet && packet->stream_index != index_)\r
- return;\r
-\r
- packets_.push(packet);\r
- } \r
- \r
- std::vector<std::shared_ptr<core::audio_buffer>> poll()\r
+ \r
+ std::shared_ptr<core::audio_buffer> poll()\r
{\r
- std::vector<std::shared_ptr<core::audio_buffer>> result;\r
-\r
+ auto packet = create_packet();\r
+ \r
if(packets_.empty())\r
- return result;\r
- \r
- auto packet = packets_.front();\r
-\r
- if(packet) \r
{\r
- result.push_back(decode(*packet));\r
- if(packet->size == 0) \r
- packets_.pop();\r
+ if(!try_receive(source_, packet) || packet->stream_index != index_)\r
+ return nullptr;\r
+ else\r
+ packets_.push(packet);\r
}\r
- else \r
+ \r
+ packet = packets_.front();\r
+ \r
+ std::shared_ptr<core::audio_buffer> audio;\r
+ if(packet == loop_packet()) \r
{ \r
- avcodec_flush_buffers(codec_context_.get());\r
- result.push_back(nullptr);\r
+ avcodec_flush_buffers(codec_context_.get()); \r
+ audio = loop_audio();\r
+ } \r
+ else\r
+ audio = decode(*packet);\r
+ \r
+ if(packet->size == 0) \r
packets_.pop();\r
- } \r
\r
- return result;\r
+ return audio;\r
}\r
\r
std::shared_ptr<core::audio_buffer> decode(AVPacket& pkt)\r
\r
return std::make_shared<core::audio_buffer>(samples, samples + n_samples);\r
}\r
-\r
- bool ready() const\r
- {\r
- return !packets_.empty();\r
- }\r
};\r
\r
-audio_decoder::audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) : impl_(new implementation(context, format_desc)){}\r
-void audio_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
-bool audio_decoder::ready() const{return impl_->ready();}\r
-std::vector<std::shared_ptr<core::audio_buffer>> audio_decoder::poll(){return impl_->poll();}\r
+audio_decoder::audio_decoder(audio_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) : impl_(new implementation(source, context, format_desc)){}\r
+std::shared_ptr<core::audio_buffer> audio_decoder::poll(){return impl_->poll();}\r
\r
}}
\ No newline at end of file
\r
}\r
\r
+#include <agents.h>\r
+\r
namespace ffmpeg {\r
\r
class audio_decoder : boost::noncopyable\r
{\r
public:\r
- explicit audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
+ typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
+\r
+ explicit audio_decoder(source_t& source, const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
\r
- void push(const std::shared_ptr<AVPacket>& packet);\r
- bool ready() const;\r
- std::vector<std::shared_ptr<core::audio_buffer>> poll();\r
+ std::shared_ptr<core::audio_buffer> poll();\r
\r
private:\r
struct implementation;\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> resample(std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>>&& data)\r
{\r
- if(resampler_)\r
+ if(resampler_ && !data.empty())\r
{\r
buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
auto ret = audio_resample(resampler_.get(),\r
const safe_ptr<core::frame_factory> frame_factory_;\r
const core::video_format_desc format_desc_;\r
\r
- unbounded_buffer<std::shared_ptr<AVPacket>> packets_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> packets_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> video_packets_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> audio_packets_;\r
+ unbounded_buffer<safe_ptr<core::basic_frame>> frames_;\r
+\r
input input_; \r
video_decoder video_decoder_;\r
audio_decoder audio_decoder_; \r
, frame_factory_(frame_factory) \r
, format_desc_(frame_factory->get_video_format_desc())\r
, input_(packets_, graph_, filename_, loop, start, length)\r
- , video_decoder_(input_.context(), frame_factory)\r
- , audio_decoder_(input_.context(), frame_factory->get_video_format_desc())\r
+ , video_decoder_(video_packets_, input_.context(), frame_factory)\r
+ , audio_decoder_(audio_packets_, input_.context(), frame_factory->get_video_format_desc())\r
, fps_(video_decoder_.fps())\r
- , muxer_(fps_, frame_factory, filter)\r
+ , muxer_(frames_, fps_, frame_factory, filter)\r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
\r
frame_timer_.restart();\r
\r
- for(int n = 0; n < 64 && muxer_.empty(); ++n)\r
+ for(int n = 0; n < 64 && !try_receive(frames_, frame); ++n)\r
decode_frame(hints);\r
\r
graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
\r
- if(!muxer_.empty())\r
- frame = last_frame_ = muxer_.pop(); \r
+ if(frame != core::basic_frame::late())\r
+ last_frame_ = frame; \r
else\r
{\r
if(input_.eof())\r
{\r
return disable_audio(last_frame_);\r
}\r
-\r
- void push_packets()\r
+ \r
+ void decode_frame(int hints)\r
{\r
- for(int n = 0; n < 16 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) \r
+ if(!muxer_.need_video())\r
{\r
- std::shared_ptr<AVPacket> pkt;\r
- if(try_receive(packets_, pkt))\r
+ std::shared_ptr<AVFrame> video;\r
+ while(!video)\r
{\r
- video_decoder_.push(pkt);\r
- audio_decoder_.push(pkt);\r
+ auto pkt = create_packet();\r
+ if(try_receive(packets_, pkt))\r
+ {\r
+ send(video_packets_, pkt);\r
+ send(audio_packets_, pkt);\r
+ }\r
+ video = video_decoder_.poll();\r
+ Context::Yield();\r
}\r
+ is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_;\r
+ muxer_.push(make_safe_ptr(video), hints); \r
}\r
- }\r
\r
- void decode_frame(int hints)\r
- {\r
- push_packets();\r
+ Context::Yield();\r
\r
- parallel_invoke(\r
- [&]\r
+ if(!muxer_.need_audio())\r
{\r
- if(muxer_.video_ready())\r
- return;\r
-\r
- auto video_frames = video_decoder_.poll();\r
- BOOST_FOREACH(auto& video, video_frames) \r
+ std::shared_ptr<core::audio_buffer> audio;\r
+ while(!audio)\r
{\r
- is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_;\r
- muxer_.push(video, hints); \r
+ auto pkt = create_packet();\r
+ if(try_receive(packets_, pkt))\r
+ {\r
+ send(video_packets_, pkt);\r
+ send(audio_packets_, pkt);\r
+ }\r
+ audio = audio_decoder_.poll();\r
+ Context::Yield();\r
}\r
- },\r
- [&]\r
- {\r
- if(muxer_.audio_ready())\r
- return;\r
- \r
- auto audio_samples = audio_decoder_.poll();\r
- BOOST_FOREACH(auto& audio, audio_samples)\r
- muxer_.push(audio); \r
- });\r
-\r
- muxer_.commit();\r
+ muxer_.push(make_safe_ptr(audio)); \r
+ }\r
}\r
\r
virtual int64_t nb_frames() const \r
#include <vector>\r
\r
using namespace caspar::core;\r
+using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
struct frame_muxer::implementation : boost::noncopyable\r
{ \r
+ frame_muxer::target_t& target_;\r
std::deque<std::queue<safe_ptr<write_frame>>> video_streams_;\r
std::deque<core::audio_buffer> audio_streams_;\r
std::deque<safe_ptr<basic_frame>> frame_buffer_;\r
safe_ptr<core::frame_factory> frame_factory_;\r
std::wstring filter_str_;\r
\r
- implementation(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
- : video_streams_(1)\r
+ implementation(frame_muxer::target_t& target, double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
+ : target_(target)\r
+ , video_streams_(1)\r
, audio_streams_(1)\r
, display_mode_(display_mode::invalid)\r
, in_fps_(in_fps)\r
{\r
}\r
\r
- void push(const std::shared_ptr<AVFrame>& video_frame, int hints)\r
+ void push(const safe_ptr<AVFrame>& video_frame, int hints)\r
{ \r
- if(!video_frame)\r
+ if(video_frame == loop_video())\r
{ \r
CASPAR_LOG(debug) << L"video-frame-count: " << static_cast<float>(video_frame_count_);\r
video_frame_count_ = 0;\r
video_streams_.push_back(std::queue<safe_ptr<write_frame>>());\r
return;\r
}\r
-\r
- if(video_frame->data[0] == nullptr)\r
- {\r
- video_streams_.back().push(make_safe<core::write_frame>(this));\r
- ++video_frame_count_;\r
- display_mode_ = display_mode::simple;\r
- return;\r
- }\r
-\r
+ \r
if(display_mode_ == display_mode::invalid)\r
- initialize_display_mode(*video_frame);\r
+ initialize(*video_frame);\r
\r
if(hints & core::frame_producer::ALPHA_HINT)\r
video_frame->format = make_alpha_format(video_frame->format);\r
\r
if(video_streams_.back().size() > 8)\r
BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("video-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+ \r
+ commit();\r
}\r
\r
- void push(const std::shared_ptr<core::audio_buffer>& audio_samples)\r
+ void push(const safe_ptr<core::audio_buffer>& audio_samples)\r
{\r
- if(!audio_samples) \r
+ if(audio_samples == loop_audio()) \r
{\r
CASPAR_LOG(debug) << L"audio-chunk-count: " << audio_sample_count_/format_desc_.audio_samples_per_frame;\r
audio_streams_.push_back(core::audio_buffer());\r
\r
if(audio_streams_.back().size() > 8*format_desc_.audio_samples_per_frame)\r
BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("audio-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
- }\r
\r
- safe_ptr<basic_frame> pop()\r
- { \r
- auto frame = frame_buffer_.front();\r
- frame_buffer_.pop_front(); \r
- return frame;\r
- }\r
-\r
- size_t size() const\r
- {\r
- return frame_buffer_.size();\r
+ commit();\r
}\r
-\r
+ \r
safe_ptr<core::write_frame> pop_video()\r
{\r
auto frame = video_streams_.front().front();\r
return samples;\r
}\r
\r
- bool video_ready() const\r
+ bool need_video() const\r
{ \r
- return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && video_ready2());\r
+ return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && need_video2());\r
}\r
\r
- bool audio_ready() const\r
+ bool need_audio() const\r
{\r
- return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && audio_ready2());\r
+ return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && need_audio2());\r
}\r
\r
- bool video_ready2() const\r
+ bool need_video2() const\r
{ \r
switch(display_mode_)\r
{\r
}\r
}\r
\r
- bool audio_ready2() const\r
+ bool need_audio2() const\r
{\r
switch(display_mode_)\r
{\r
}\r
}\r
\r
+ bool try_pop(safe_ptr<core::basic_frame>& frame)\r
+ {\r
+ commit();\r
+\r
+ if(frame_buffer_.empty())\r
+ return false;\r
+\r
+ frame = std::move(frame_buffer_.front());\r
+ frame_buffer_.pop_front(); \r
+ return true;\r
+ }\r
+\r
void commit()\r
{\r
- if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!video_ready2() || !audio_ready2()))\r
+ if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!need_video2() || !need_audio2()))\r
{\r
if(!video_streams_.front().empty() || !audio_streams_.front().empty())\r
CASPAR_LOG(debug) << "Truncating: " << video_streams_.front().size() << L" video-frames, " << audio_streams_.front().size() << L" audio-samples.";\r
audio_streams_.pop_front();\r
}\r
\r
- if(!video_ready2() || !audio_ready2())\r
+ if(!need_video2() || !need_audio2())\r
return;\r
\r
switch(display_mode_)\r
case display_mode::simple:\r
case display_mode::deinterlace_bob:\r
case display_mode::deinterlace: \r
- return simple(frame_buffer_);\r
+ return simple();\r
case display_mode::duplicate: \r
- return duplicate(frame_buffer_);\r
+ return duplicate();\r
case display_mode::half: \r
- return half(frame_buffer_);\r
+ return half();\r
case display_mode::interlace:\r
case display_mode::deinterlace_bob_reinterlace: \r
- return interlace(frame_buffer_);\r
+ return interlace();\r
default: \r
BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
}\r
}\r
\r
- void simple(std::deque<safe_ptr<basic_frame>>& dest)\r
+ void simple()\r
{ \r
auto frame1 = pop_video();\r
frame1->audio_data() = pop_audio();\r
-\r
- dest.push_back(frame1); \r
+ \r
+ send(target_, safe_ptr<basic_frame>(frame1)); \r
}\r
\r
- void duplicate(std::deque<safe_ptr<basic_frame>>& dest)\r
+ void duplicate()\r
{ \r
auto frame = pop_video();\r
\r
\r
auto frame2 = frame;\r
frame2->audio_data() = pop_audio();\r
-\r
- dest.push_back(frame1);\r
- dest.push_back(frame2);\r
+ \r
+ send(target_, safe_ptr<basic_frame>(frame1)); \r
+ send(target_, safe_ptr<basic_frame>(frame2)); \r
}\r
\r
- void half(std::deque<safe_ptr<basic_frame>>& dest)\r
+ void half()\r
{ \r
auto frame1 = pop_video();\r
frame1->audio_data() = pop_audio();\r
\r
video_streams_.front().pop(); // Throw away\r
-\r
- dest.push_back(frame1);\r
+ \r
+ send(target_, safe_ptr<basic_frame>(frame1)); \r
}\r
\r
- void interlace(std::deque<safe_ptr<basic_frame>>& dest)\r
+ void interlace()\r
{ \r
auto frame1 = pop_video();\r
frame1->audio_data() = pop_audio();\r
\r
auto frame2 = pop_video();\r
-\r
- dest.push_back(core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode)); \r
+ \r
+ send(target_, core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode)); \r
}\r
\r
- void initialize_display_mode(AVFrame& frame)\r
+ void initialize(AVFrame& frame)\r
{\r
auto display_mode = display_mode::invalid;\r
\r
}\r
\r
filter_ = filter(filter_str_);\r
+ display_mode_ = display_mode;\r
\r
CASPAR_LOG(info) << "[frame_muxer] " << display_mode::print(display_mode);\r
-\r
- display_mode_ = display_mode;\r
}\r
\r
\r
}\r
};\r
\r
-frame_muxer::frame_muxer(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
- : impl_(new implementation(in_fps, frame_factory, filter_str)){}\r
-void frame_muxer::push(const std::shared_ptr<AVFrame>& video_frame, int hints){impl_->push(video_frame, hints);}\r
-void frame_muxer::push(const std::shared_ptr<core::audio_buffer>& audio_samples){return impl_->push(audio_samples);}\r
-void frame_muxer::commit(){impl_->commit();}\r
-safe_ptr<basic_frame> frame_muxer::pop(){return impl_->pop();}\r
-size_t frame_muxer::size() const {return impl_->size();}\r
-bool frame_muxer::empty() const {return impl_->size() == 0;}\r
-bool frame_muxer::video_ready() const{return impl_->video_ready();}\r
-bool frame_muxer::audio_ready() const{return impl_->audio_ready();}\r
+frame_muxer::frame_muxer(target_t& target, double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str)\r
+ : impl_(new implementation(target, in_fps, frame_factory, filter_str)){}\r
+void frame_muxer::push(const safe_ptr<AVFrame>& video_frame, int hints){impl_->push(video_frame, hints);}\r
+void frame_muxer::push(const safe_ptr<core::audio_buffer>& audio_samples){return impl_->push(audio_samples);}\r
+bool frame_muxer::need_video() const{return impl_->need_video();}\r
+bool frame_muxer::need_audio() const{return impl_->need_audio();}\r
int64_t frame_muxer::calc_nb_frames(int64_t nb_frames) const {return impl_->calc_nb_frames(nb_frames);}\r
\r
}}
\ No newline at end of file
\r
}\r
\r
+#include <agents.h>\r
+\r
namespace ffmpeg {\r
\r
class frame_muxer : boost::noncopyable\r
{\r
public:\r
- frame_muxer(double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str);\r
+ typedef Concurrency::ITarget<safe_ptr<core::basic_frame>> target_t;\r
+\r
+ frame_muxer(target_t& target, double in_fps, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter_str);\r
\r
- void push(const std::shared_ptr<AVFrame>& video_frame, int hints = 0);\r
- void push(const std::shared_ptr<core::audio_buffer>& audio_samples);\r
+ void push(const safe_ptr<AVFrame>& video_frame, int hints = 0);\r
+ void push(const safe_ptr<core::audio_buffer>& audio_samples);\r
\r
- void commit();\r
-\r
- bool video_ready() const;\r
- bool audio_ready() const;\r
-\r
- size_t size() const;\r
- bool empty() const;\r
+ bool need_video() const;\r
+ bool need_audio() const; \r
\r
int64_t calc_nb_frames(int64_t nb_frames) const;\r
-\r
- safe_ptr<core::basic_frame> pop();\r
private:\r
struct implementation;\r
safe_ptr<implementation> impl_;\r
static const size_t MAX_BUFFER_SIZE = 16 * 1000000;\r
static const size_t MAX_BUFFER_COUNT = 100;\r
\r
-struct input::implementation : public agent, public std::enable_shared_from_this<input::implementation>, boost::noncopyable\r
+struct input::implementation : public agent, boost::noncopyable\r
{ \r
- input::target_t& target_;\r
- const safe_ptr<diagnostics::graph> graph_;\r
+ const safe_ptr<AVFormatContext> format_context_; \r
+ const safe_ptr<diagnostics::graph> graph_;\r
\r
- const safe_ptr<AVFormatContext> format_context_; // Destroy this last\r
- const int default_stream_index_;\r
+ input::target_t& target_;\r
+\r
+ const int default_stream_index_;\r
\r
- const std::wstring filename_;\r
- const bool loop_;\r
- const size_t start_; \r
- const size_t length_;\r
- size_t frame_number_;\r
+ const std::wstring filename_;\r
+ const bool loop_;\r
+ const size_t start_; \r
+ const size_t length_;\r
+ size_t frame_number_;\r
\r
- tbb::atomic<size_t> buffer_size_;\r
- tbb::atomic<size_t> buffer_count_;\r
- Concurrency::event event_;\r
+ tbb::atomic<size_t> buffer_size_;\r
+ tbb::atomic<size_t> buffer_count_;\r
+ Concurrency::event event_;\r
\r
- tbb::atomic<bool> is_running_;\r
+ tbb::atomic<bool> is_running_;\r
\r
- tbb::atomic<size_t> nb_frames_;\r
- tbb::atomic<size_t> nb_loops_;\r
+ tbb::atomic<size_t> nb_frames_;\r
+ tbb::atomic<size_t> nb_loops_;\r
\r
public:\r
explicit implementation(input::target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
- : target_(target)\r
+ : format_context_(open_input(filename))\r
, graph_(graph)\r
- , format_context_(open_input(filename))\r
+ , target_(target)\r
, default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
, loop_(loop)\r
, filename_(filename)\r
auto size = packet->size;\r
auto data = packet->data;\r
\r
- auto self = shared_from_this();\r
packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
{\r
packet->size = size;\r
packet->data = data;\r
\r
- self->buffer_size_ -= packet->size;\r
- --self->buffer_count_;\r
- self->event_.set();\r
- self->graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
- self->graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
+ buffer_size_ -= packet->size;\r
+ --buffer_count_;\r
+ event_.set();\r
+ graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+ graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
});\r
\r
buffer_size_ += packet->size;\r
if((buffer_size_ > MAX_BUFFER_SIZE || buffer_count_ > MAX_BUFFER_COUNT) && is_running_)\r
event_.reset();\r
\r
- send(target_, std::shared_ptr<AVPacket>(packet));\r
+ send(target_, packet);\r
\r
graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
graph_->update_value("buffer-count", (static_cast<double>(buffer_count_)+0.001)/MAX_BUFFER_COUNT);\r
\r
THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
\r
- Concurrency::asend(target_, std::shared_ptr<AVPacket>()); \r
+ Concurrency::asend(target_, loop_packet()); \r
\r
graph_->add_tag("seek"); \r
} \r
class input : boost::noncopyable\r
{\r
public:\r
- typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
+ typedef Concurrency::ITarget<safe_ptr<AVPacket>> target_t;\r
\r
explicit input(target_t& target, const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start = 0, size_t length = std::numeric_limits<size_t>::max());\r
\r
\r
// Dataflow\r
\r
-safe_ptr<AVPacket> loop_packet(int index);\r
-safe_ptr<AVPacket> eof_packet(int index);\r
+safe_ptr<AVPacket> loop_packet(int index = 0);\r
+safe_ptr<AVPacket> eof_packet(int index = 0);\r
safe_ptr<AVFrame> loop_video();\r
safe_ptr<AVFrame> eof_video();\r
safe_ptr<core::audio_buffer> loop_audio();\r
#pragma warning (pop)\r
#endif\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace ffmpeg {\r
\r
struct video_decoder::implementation : boost::noncopyable\r
{\r
+ video_decoder::source_t& source_;\r
const safe_ptr<core::frame_factory> frame_factory_;\r
int index_;\r
- safe_ptr<AVCodecContext> codec_context_;\r
+ const safe_ptr<AVCodecContext> codec_context_;\r
\r
- std::queue<std::shared_ptr<AVPacket>> packets_;\r
+ std::queue<safe_ptr<AVPacket>> packets_;\r
\r
const double fps_;\r
const int64_t nb_frames_;\r
const size_t width_;\r
const size_t height_;\r
-\r
+ \r
public:\r
- explicit implementation(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) \r
- : frame_factory_(frame_factory)\r
+ explicit implementation(video_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) \r
+ : source_(source)\r
+ , frame_factory_(frame_factory)\r
, codec_context_(open_codec(*context, AVMEDIA_TYPE_VIDEO, index_))\r
, fps_(static_cast<double>(codec_context_->time_base.den) / static_cast<double>(codec_context_->time_base.num))\r
, nb_frames_(context->streams[index_]->nb_frames)\r
, height_(codec_context_->height)\r
{ \r
}\r
-\r
- void push(const std::shared_ptr<AVPacket>& packet)\r
- {\r
- if(packet && packet->stream_index != index_)\r
- return;\r
-\r
- packets_.push(packet);\r
- }\r
-\r
- std::vector<std::shared_ptr<AVFrame>> poll()\r
+ \r
+ std::shared_ptr<AVFrame> poll()\r
{ \r
- std::vector<std::shared_ptr<AVFrame>> result;\r
-\r
- if(packets_.empty())\r
- return result;\r
+ auto packet = create_packet();\r
\r
- auto packet = packets_.front();\r
- \r
- if(packet)\r
- { \r
- boost::range::push_back(result, decode(*packet));\r
-\r
- if(packet->size == 0)\r
- packets_.pop();\r
+ if(packets_.empty())\r
+ {\r
+ if(!try_receive(source_, packet) || packet->stream_index != index_)\r
+ return nullptr;\r
+ else\r
+ packets_.push(packet);\r
}\r
- else\r
+ \r
+ packet = packets_.front();\r
+\r
+ std::shared_ptr<AVFrame> video;\r
+\r
+ if(packet == loop_packet())\r
{\r
if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
{\r
pkt.data = nullptr;\r
pkt.size = 0;\r
\r
- boost::range::push_back(result, decode(pkt)); \r
+ video = decode(pkt);\r
+ if(video)\r
+ packets_.push(packet);\r
}\r
\r
- if(result.empty())\r
- { \r
- packets_.pop();\r
+ if(!video)\r
+ {\r
avcodec_flush_buffers(codec_context_.get());\r
- result.push_back(nullptr);\r
+ video = loop_video();\r
}\r
- }\r
- \r
- return result;\r
+ } \r
+ else\r
+ video = decode(*packet);\r
+\r
+ if(packet->size == 0)\r
+ packets_.pop();\r
+ \r
+ return video;\r
}\r
\r
- std::vector<std::shared_ptr<AVFrame>> decode(AVPacket& pkt)\r
+ std::shared_ptr<AVFrame> decode(AVPacket& pkt)\r
{\r
std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), av_free);\r
\r
pkt.size = 0;\r
\r
if(frame_finished == 0) \r
- return std::vector<std::shared_ptr<AVFrame>>();\r
+ return nullptr;\r
\r
- if(decoded_frame->repeat_pict % 2 > 0)\r
+ if(decoded_frame->repeat_pict > 0)\r
CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
\r
- return std::vector<std::shared_ptr<AVFrame>>(1 + decoded_frame->repeat_pict/2, decoded_frame);\r
- }\r
- \r
- bool ready() const\r
- {\r
- return !packets_.empty();\r
+ return decoded_frame;\r
}\r
\r
double fps() const\r
}\r
};\r
\r
-video_decoder::video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) : impl_(new implementation(context, frame_factory)){}\r
-void video_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
-std::vector<std::shared_ptr<AVFrame>> video_decoder::poll(){return impl_->poll();}\r
-bool video_decoder::ready() const{return impl_->ready();}\r
+video_decoder::video_decoder(video_decoder::source_t& source, const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory) : impl_(new implementation(source, context, frame_factory)){}\r
+std::shared_ptr<AVFrame> video_decoder::poll(){return impl_->poll();}\r
double video_decoder::fps() const{return impl_->fps();}\r
int64_t video_decoder::nb_frames() const{return impl_->nb_frames_;}\r
size_t video_decoder::width() const{return impl_->width_;}\r
class write_frame;\r
}\r
\r
+#include <agents.h>\r
+\r
namespace ffmpeg {\r
\r
class video_decoder : boost::noncopyable\r
{\r
public:\r
- explicit video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory);\r
+ typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
+\r
+ explicit video_decoder(source_t& source, const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory);\r
\r
- void push(const std::shared_ptr<AVPacket>& packet);\r
- bool ready() const;\r
- std::vector<std::shared_ptr<AVFrame>> poll();\r
+ std::shared_ptr<AVFrame> poll();\r
\r
size_t width() const;\r
size_t height() const;\r