\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Tue Oct 18 00:55:44 2011\r
+/* at Tue Oct 18 13:46:11 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Tue Oct 18 00:55:44 2011\r
+/* at Tue Oct 18 13:46:11 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
\r
struct audio_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
{\r
- audio_decoder::token_t& active_token_;\r
audio_decoder::source_t& source_;\r
audio_decoder::target_t& target_;\r
\r
\r
int64_t nb_frames_;\r
public:\r
- explicit implementation(audio_decoder::token_t& active_token,\r
- audio_decoder::source_t& source,\r
+ explicit implementation(audio_decoder::source_t& source,\r
audio_decoder::target_t& target,\r
const safe_ptr<AVFormatContext>& context, \r
const core::video_format_desc& format_desc) \r
- : active_token_(active_token)\r
- , source_(source)\r
+ : source_(source)\r
, target_(target)\r
, format_desc_(format_desc) \r
, nb_frames_(0)\r
{\r
try\r
{\r
- while(Concurrency::receive(active_token_))\r
+ while(true)\r
{\r
auto packet = Concurrency::receive(source_);\r
- if(packet == eof_packet())\r
- {\r
- Concurrency::send(target_, eof_audio());\r
+ if(packet == eof_packet()) \r
break;\r
- }\r
-\r
+ \r
if(packet == loop_packet()) \r
{ \r
if(codec_context_)\r
avcodec_flush_buffers(codec_context_.get());\r
Concurrency::send(target_, loop_audio());\r
} \r
- else if(!codec_context_)\r
- Concurrency::send(target_, empty_audio()); \r
- else \r
- Concurrency::send(target_, decode(*packet)); \r
+ else if(packet->stream_index == index_)\r
+ {\r
+ if(!codec_context_)\r
+ Concurrency::send(target_, empty_audio()); \r
+ else \r
+ Concurrency::send(target_, decode(*packet)); \r
+ }\r
}\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
-\r
- std::shared_ptr<AVPacket> packet;\r
- Concurrency::try_receive(source_, packet); \r
+ \r
+ Concurrency::send(target_, eof_audio());\r
\r
done();\r
}\r
}\r
};\r
\r
-audio_decoder::audio_decoder(token_t& active_token,\r
- source_t& source,\r
+audio_decoder::audio_decoder(source_t& source,\r
target_t& target,\r
const safe_ptr<AVFormatContext>& context, \r
const core::video_format_desc& format_desc)\r
- : impl_(new implementation(active_token, source, target, context, format_desc))\r
+ : impl_(new implementation(source, target, context, format_desc))\r
{\r
}\r
int64_t audio_decoder::nb_frames() const{return impl_->nb_frames_;}\r
{\r
public:\r
\r
- typedef Concurrency::ISource<bool> token_t;\r
typedef Concurrency::ISource<std::shared_ptr<AVPacket>> source_t;\r
typedef Concurrency::ITarget<std::shared_ptr<core::audio_buffer>> target_t;\r
\r
- explicit audio_decoder(token_t& active_token,\r
- source_t& source,\r
+ explicit audio_decoder(source_t& source,\r
target_t& target,\r
const safe_ptr<AVFormatContext>& context, \r
const core::video_format_desc& format_desc);\r
const bool loop_;\r
const size_t length_;\r
\r
- buffer_alias<AVPacket>::type video_packets_;\r
- buffer_alias<AVPacket>::type audio_packets_;\r
- buffer_alias<AVFrame>::type video_frames_;\r
- buffer_alias<core::audio_buffer>::type audio_buffers_;\r
- buffer_alias<core::basic_frame>::type muxed_frames_;\r
- Concurrency::overwrite_buffer<bool> active_token_;\r
+ buffer_alias<AVPacket>::type packets0_;\r
+ buffer_alias<AVPacket>::type packets1_;\r
+ buffer_alias<AVFrame>::type video_frames_;\r
+ buffer_alias<core::audio_buffer>::type audio_buffers_;\r
+ buffer_alias<core::basic_frame>::type muxed_frames_;\r
\r
const safe_ptr<diagnostics::graph> graph_;\r
\r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
- , video_packets_(25)\r
- , audio_packets_(25)\r
+ , packets0_(2)\r
+ , packets1_(2)\r
, video_frames_(2)\r
, audio_buffers_(2)\r
, muxed_frames_(2)\r
, graph_(diagnostics::create_graph([this]{return print();}, false))\r
- , input_(active_token_, video_packets_, audio_packets_, graph_, filename_, loop, start, length)\r
- , video_decoder_(active_token_, video_packets_, video_frames_, input_.context(), frame_factory->get_video_format_desc().fps, filter)\r
- , audio_decoder_(active_token_, audio_packets_, audio_buffers_, input_.context(), frame_factory->get_video_format_desc())\r
- , muxer_(active_token_, video_frames_, audio_buffers_, muxed_frames_, video_decoder_.fps(), frame_factory)\r
+ , input_(packets0_, graph_, filename_, loop, start, length)\r
+ , video_decoder_(packets0_, packets1_, video_frames_, input_.context(), frame_factory->get_video_format_desc().fps, filter)\r
+ , audio_decoder_(packets1_, audio_buffers_, input_.context(), frame_factory->get_video_format_desc())\r
+ , muxer_(video_frames_, audio_buffers_, muxed_frames_, video_decoder_.fps(), frame_factory)\r
, last_frame_(core::basic_frame::empty())\r
{\r
graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f)); \r
graph_->start();\r
-\r
- Concurrency::send(active_token_, true);\r
}\r
\r
~ffmpeg_producer()\r
{\r
- Concurrency::send(active_token_, false);\r
- std::shared_ptr<core::basic_frame> frame;\r
- Concurrency::try_receive(muxed_frames_, frame);\r
+ input_.stop();\r
+ while(Concurrency::receive(muxed_frames_) != core::basic_frame::eof())\r
+ {\r
+ }\r
}\r
- \r
+ \r
virtual safe_ptr<core::basic_frame> receive(int hints)\r
{\r
auto frame = core::basic_frame::late();\r
\r
struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
{ \r
- frame_muxer2::token_t& active_token_;\r
frame_muxer2::video_source_t& video_source_;\r
frame_muxer2::audio_source_t& audio_source_;\r
frame_muxer2::target_t& target_;\r
filter filter_;\r
safe_ptr<core::frame_factory> frame_factory_;\r
\r
- implementation(frame_muxer2::token_t& active_token,\r
- frame_muxer2::video_source_t& video_source,\r
+ implementation(frame_muxer2::video_source_t& video_source,\r
frame_muxer2::audio_source_t& audio_source,\r
frame_muxer2::target_t& target,\r
double in_fps, \r
const safe_ptr<core::frame_factory>& frame_factory)\r
- : active_token_(active_token)\r
- , video_source_(video_source)\r
+ : video_source_(video_source)\r
, audio_source_(audio_source)\r
, target_(target)\r
, video_streams_(1)\r
{\r
try\r
{\r
- while(Concurrency::receive(active_token_))\r
+ while(true)\r
{\r
Concurrency::parallel_invoke(\r
[&]\r
} \r
});\r
\r
- if(!video_ready() || !audio_ready())\r
- {\r
- Concurrency::send(target_, std::shared_ptr<core::basic_frame>(core::basic_frame::eof()));\r
- break;\r
- }\r
+ if(!video_ready() || !audio_ready()) \r
+ break; \r
\r
commit();\r
\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
-\r
- std::shared_ptr<AVFrame> video;\r
- Concurrency::try_receive(video_source_, video);\r
- std::shared_ptr<core::audio_buffer> audio;\r
- Concurrency::try_receive(audio_source_, audio);\r
- \r
+ \r
+ Concurrency::send(target_, std::shared_ptr<core::basic_frame>(core::basic_frame::eof()));\r
+ \r
done();\r
}\r
\r
}\r
};\r
\r
-frame_muxer2::frame_muxer2(token_t& active_token,\r
- video_source_t& video_source, \r
+frame_muxer2::frame_muxer2(video_source_t& video_source, \r
audio_source_t& audio_source,\r
target_t& target,\r
double in_fps, \r
const safe_ptr<core::frame_factory>& frame_factory)\r
- : impl_(new implementation(active_token, video_source, audio_source, target, in_fps, frame_factory))\r
+ : impl_(new implementation(video_source, audio_source, target, in_fps, frame_factory))\r
{\r
}\r
int64_t frame_muxer2::calc_nb_frames(int64_t nb_frames) const\r
{\r
public:\r
\r
- typedef Concurrency::ISource<bool> token_t;\r
typedef Concurrency::ISource<std::shared_ptr<AVFrame>> video_source_t;\r
typedef Concurrency::ISource<std::shared_ptr<core::audio_buffer>> audio_source_t;\r
typedef Concurrency::ITarget<std::shared_ptr<core::basic_frame>> target_t;\r
\r
- frame_muxer2(token_t& active_token,\r
- video_source_t& video_source,\r
+ frame_muxer2(video_source_t& video_source,\r
audio_source_t& audio_source, \r
target_t& target,\r
double in_fps, \r
const size_t length_;\r
size_t frame_number_;\r
\r
- input::token_t& active_token_;\r
- input::target_t& video_target_;\r
- input::target_t& audio_target_;\r
+ input::target_t& target_;\r
\r
tbb::atomic<size_t> nb_frames_;\r
tbb::atomic<size_t> nb_loops_;\r
\r
- int video_index_;\r
- int audio_index_;\r
+ std::deque<std::shared_ptr<AVPacket>> buffer_;\r
+ size_t buffer_size_;\r
+\r
+ bool eof_;\r
+ bool stop_;\r
\r
public:\r
- explicit implementation(input::token_t& active_token,\r
- input::target_t& video_target,\r
- input::target_t& audio_target,\r
+ explicit implementation(input::target_t& target,\r
const safe_ptr<diagnostics::graph>& graph, \r
const std::wstring& filename, \r
bool loop, \r
size_t start,\r
size_t length)\r
- : active_token_(active_token)\r
- , video_target_(video_target)\r
- , audio_target_(audio_target)\r
+ : target_(target)\r
, graph_(graph)\r
, loop_(loop)\r
, filename_(filename)\r
, start_(start)\r
, length_(length)\r
, frame_number_(0)\r
+ , buffer_size_(0)\r
+ , eof_(false)\r
+ , stop_(false)\r
{ \r
nb_frames_ = 0;\r
nb_loops_ = 0;\r
THROW_ON_ERROR2(avformat_find_stream_info(format_context_.get(), nullptr), print());\r
\r
default_stream_index_ = THROW_ON_ERROR2(av_find_default_stream_index(format_context_.get()), print());\r
- video_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0);\r
- audio_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_AUDIO, -1, -1, 0, 0);\r
\r
if(start_ > 0) \r
seek_frame(start_);\r
for(int n = 0; n < 16; ++n)\r
read_next_packet();\r
\r
- graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); \r
+ graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+ graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f));\r
+ graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
\r
agent::start();\r
}\r
{\r
try\r
{\r
- while(Concurrency::receive(active_token_))\r
+ while(!stop_)\r
{\r
- if(!read_next_packet())\r
- {\r
- Concurrency::send(video_target_, eof_packet());\r
- Concurrency::send(audio_target_, eof_packet());\r
+ read_next_packet();\r
+\r
+ if(buffer_.empty())\r
break;\r
+\r
+ if(buffer_.size() < MIN_BUFFER_COUNT || (buffer_.size() < MAX_BUFFER_COUNT && buffer_size_ < MAX_BUFFER_SIZE))\r
+ {\r
+ if(Concurrency::asend(target_, buffer_.front()))\r
+ buffer_.pop_front();\r
+ Concurrency::wait(2);\r
}\r
+ else\r
+ {\r
+ Concurrency::send(target_, buffer_.front());\r
+ buffer_.pop_front();\r
+ } \r
+\r
+ graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+ graph_->update_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); \r
} \r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- } \r
+ } \r
+ \r
+ Concurrency::send(target_, eof_packet()); \r
\r
done();\r
}\r
\r
- bool read_next_packet()\r
+ void read_next_packet()\r
{ \r
+ if(eof_)\r
+ return;\r
+\r
int ret = 0;\r
\r
auto read_packet = create_packet();\r
else\r
{\r
CASPAR_LOG(trace) << print() << " Stopping.";\r
- return false;\r
+ eof_ = true;\r
}\r
}\r
- else if(read_packet->stream_index == video_index_ || read_packet->stream_index == audio_index_)\r
+ else\r
{ \r
THROW_ON_ERROR(ret, print(), "av_read_frame");\r
\r
read_packet->data = data;\r
});\r
\r
- if(read_packet->stream_index == video_index_)\r
- Concurrency::send(video_target_, read_packet);\r
- else if(read_packet->stream_index == audio_index_)\r
- Concurrency::send(audio_target_, read_packet);\r
+ buffer_.push_back(read_packet);\r
+ buffer_size_ += read_packet->size;\r
+ \r
+ graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+ graph_->update_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
} \r
-\r
- return true;\r
}\r
\r
void seek_frame(int64_t frame, int flags = 0)\r
THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
auto packet = create_packet();\r
packet->size = 0;\r
- Concurrency::send(video_target_, loop_packet()); \r
- Concurrency::send(audio_target_, loop_packet());\r
+ buffer_.push_back(loop_packet());\r
} \r
\r
bool is_eof(int ret)\r
}\r
};\r
\r
-input::input(token_t& active_token, \r
- target_t& video_target, \r
- target_t& audio_target, \r
+input::input(target_t& target, \r
const safe_ptr<diagnostics::graph>& graph, \r
const std::wstring& filename, \r
bool loop, \r
size_t start, \r
size_t length)\r
- : impl_(new implementation(active_token, video_target, audio_target, graph, filename, loop, start, length))\r
+ : impl_(new implementation(target, graph, filename, loop, start, length))\r
{\r
}\r
\r
return impl_->nb_loops_;\r
}\r
\r
+void input::stop()\r
+{\r
+ impl_->stop_ = true;\r
+}\r
+\r
}}
\ No newline at end of file
{\r
public:\r
\r
- typedef Concurrency::ISource<bool> token_t;\r
typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
\r
- explicit input(token_t& active_token,\r
- target_t& video_target, \r
- target_t& audio_target, \r
+ explicit input(target_t& target, \r
const safe_ptr<diagnostics::graph>& graph, \r
const std::wstring& filename, bool loop, \r
size_t start = 0, \r
size_t nb_loops() const;\r
\r
safe_ptr<AVFormatContext> context();\r
+\r
+ void stop();\r
private:\r
friend struct implemenation;\r
struct implementation;\r
\r
struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
{\r
- video_decoder::token_t& active_token_;\r
video_decoder::source_t& source_;\r
+ video_decoder::forward_t& forward_;\r
video_decoder::target_t& target_;\r
\r
std::shared_ptr<AVCodecContext> codec_context_;\r
bool is_progressive_;\r
\r
public:\r
- explicit implementation(video_decoder::token_t& active_token,\r
- video_decoder::source_t& source,\r
+ explicit implementation(video_decoder::source_t& source,\r
+ video_decoder::forward_t& forward,\r
video_decoder::target_t& target,\r
const safe_ptr<AVFormatContext>& context,\r
double fps,\r
const std::wstring& filter) \r
- : active_token_(active_token)\r
- , source_(source)\r
+ : source_(source)\r
+ , forward_(forward)\r
, target_(target)\r
, filter_(filter.empty() ? L"copy" : filter)\r
, fps_(fps)\r
{\r
try\r
{\r
- while(Concurrency::receive(active_token_))\r
+ while(true)\r
{\r
auto packet = Concurrency::receive(source_);\r
- if(packet == eof_packet())\r
- {\r
- Concurrency::send(target_, eof_video());\r
- break;\r
- }\r
\r
+ if(packet == eof_packet() || packet == loop_packet() || packet->stream_index != index_)\r
+ Concurrency::send(forward_, packet);\r
+ \r
+ if(packet == eof_packet()) \r
+ break;\r
+ \r
if(packet == loop_packet())\r
{ \r
if(codec_context_)\r
\r
avcodec_flush_buffers(codec_context_.get());\r
}\r
-\r
+ \r
Concurrency::send(target_, loop_video()); \r
}\r
- else if(!codec_context_)\r
- {\r
- Concurrency::send(target_, empty_video()); \r
- }\r
- else\r
+ else if(packet->stream_index == index_)\r
{\r
- while(packet->size > 0)\r
+ if(!codec_context_)\r
+ {\r
+ Concurrency::send(target_, empty_video()); \r
+ }\r
+ else\r
{\r
- BOOST_FOREACH(auto& frame1, decode(*packet))\r
+ while(packet->size > 0)\r
{\r
- BOOST_FOREACH(auto& frame2, filter_.execute(frame1))\r
- Concurrency::send(target_, std::shared_ptr<AVFrame>(frame2));\r
+ BOOST_FOREACH(auto& frame1, decode(*packet))\r
+ {\r
+ BOOST_FOREACH(auto& frame2, filter_.execute(frame1))\r
+ Concurrency::send(target_, std::shared_ptr<AVFrame>(frame2));\r
+ }\r
}\r
}\r
}\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
-\r
- std::shared_ptr<AVPacket> packet;\r
- Concurrency::try_receive(source_, packet); \r
\r
+ Concurrency::send(target_, eof_video());\r
+ \r
done();\r
}\r
\r
}\r
};\r
\r
-video_decoder::video_decoder(token_t& active_token,\r
- source_t& source,\r
+video_decoder::video_decoder(source_t& source,\r
+ forward_t& forward,\r
target_t& target,\r
const safe_ptr<AVFormatContext>& context, \r
double fps, \r
const std::wstring& filter) \r
- : impl_(new implementation(active_token, source, target, context, fps, filter))\r
+ : impl_(new implementation(source, forward, target, context, fps, filter))\r
{\r
}\r
\r
{\r
public:\r
\r
- typedef Concurrency::ISource<bool> token_t;\r
typedef Concurrency::ISource<std::shared_ptr<AVPacket>> source_t;\r
+ typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> forward_t;\r
typedef Concurrency::ITarget<std::shared_ptr<AVFrame>> target_t;\r
\r
- explicit video_decoder(token_t& active_token,\r
- source_t& source,\r
+ explicit video_decoder(source_t& source,\r
+ forward_t& forward,\r
target_t& target,\r
const safe_ptr<AVFormatContext>& context, \r
double fps, \r