#include <connect.h>\r
#include <semaphore.h>\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace ffmpeg {\r
\r
-struct audio_decoder::implementation : boost::noncopyable\r
+struct audio_decoder::implementation : public agent, boost::noncopyable\r
{ \r
int index_;\r
std::shared_ptr<AVCodecContext> codec_context_; \r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- Concurrency::transformer<packet_message_t, audio_message_t> transformer_;\r
+ overwrite_buffer<bool> is_running_;\r
+ unbounded_buffer<packet_message_t> source_;\r
+ ITarget<audio_message_t>& target_;\r
\r
public:\r
explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
format_desc.audio_sample_rate, codec_context_->sample_rate,\r
AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
- , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
+ , source_([this](const packet_message_t& message)\r
{\r
return message->payload && message->payload->stream_index == index_;\r
})\r
+ , target_(target)\r
{ \r
CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
\r
- Concurrency::connect(source, transformer_);\r
+ Concurrency::connect(source, source_);\r
+\r
+ start();\r
}\r
\r
- audio_message_t decode(const packet_message_t& message)\r
- { \r
- auto packet = message->payload;\r
+ ~implementation()\r
+ {\r
+ send(is_running_, false);\r
+ agent::wait(this);\r
+ }\r
\r
- if(!packet)\r
- return make_message(std::shared_ptr<core::audio_buffer>());\r
+ virtual void run()\r
+ {\r
+ try\r
+ {\r
+ send(is_running_, true);\r
+ while(is_running_.value())\r
+ { \r
+ auto message = receive(source_);\r
+ auto packet = message->payload;\r
+ \r
+ if(!packet)\r
+ continue;\r
\r
- if(packet == loop_packet(index_))\r
- return make_message(loop_audio());\r
+ if(packet == loop_packet(index_))\r
+ {\r
+ send(target_, make_message(loop_audio()));\r
+ break;\r
+ }\r
\r
- if(packet == eof_packet(index_))\r
- return make_message(eof_audio());\r
+ if(packet == eof_packet(index_))\r
+ break;\r
\r
- auto result = std::make_shared<core::audio_buffer>();\r
+ auto result = std::make_shared<core::audio_buffer>();\r
\r
- while(packet->size > 0)\r
- {\r
- buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
- int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
+ while(packet->size > 0)\r
+ {\r
+ buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
+ int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
\r
- int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
+ int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
\r
- // There might be several frames in one packet.\r
- packet->size -= ret;\r
- packet->data += ret;\r
+ // There might be several frames in one packet.\r
+ packet->size -= ret;\r
+ packet->data += ret;\r
\r
- buffer1_.resize(written_bytes);\r
+ buffer1_.resize(written_bytes);\r
\r
- buffer1_ = resampler_.resample(std::move(buffer1_));\r
+ buffer1_ = resampler_.resample(std::move(buffer1_));\r
\r
- const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
- const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
+ const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
+ const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
\r
- result->insert(result->end(), samples, samples + n_samples);\r
+ send(target_, make_message(std::make_shared<core::audio_buffer>(samples, samples + n_samples)));\r
+ }\r
+ }\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
- \r
- return make_message(result, message->token);\r
+\r
+ send(is_running_, false);\r
+ send(target_, make_message(eof_audio()));\r
+\r
+ done();\r
}\r
};\r
\r
#include <connect.h>\r
#include <semaphore.h>\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace ffmpeg {\r
\r
-struct video_decoder::implementation : boost::noncopyable\r
+struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
{ \r
int index_;\r
std::shared_ptr<AVCodecContext> codec_context_;\r
size_t height_;\r
bool is_progressive_;\r
\r
- Concurrency::transformer<packet_message_t, video_message_t> transformer_;\r
+ overwrite_buffer<bool> is_running_;\r
+ unbounded_buffer<packet_message_t> source_;\r
+ ITarget<video_message_t>& target_;\r
\r
- safe_ptr<Concurrency::semaphore> semaphore_;\r
+ safe_ptr<semaphore> semaphore_;\r
\r
public:\r
explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
, width_(codec_context_->width)\r
, height_(codec_context_->height)\r
, is_progressive_(true)\r
- , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
+ , source_([this](const packet_message_t& message)\r
{\r
return message->payload && message->payload->stream_index == index_;\r
})\r
+ , target_(target)\r
, semaphore_(make_safe<Concurrency::semaphore>(1))\r
{ \r
CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
CASPAR_VERIFY(width_ > 0, ffmpeg_error());\r
CASPAR_VERIFY(height_ > 0, ffmpeg_error());\r
\r
- Concurrency::connect(source, transformer_);\r
- }\r
- \r
- video_message_t decode(const packet_message_t& message)\r
- {\r
- auto packet = message->payload;\r
+ Concurrency::connect(source, source_);\r
\r
- if(!packet)\r
- return make_message(std::shared_ptr<AVFrame>());\r
+ start();\r
+ }\r
\r
- if(packet == loop_packet(index_))\r
- return make_message(loop_video());\r
+ ~implementation()\r
+ {\r
+ send(is_running_, false);\r
+ agent::wait(this);\r
+ }\r
\r
- if(packet == eof_packet(index_))\r
- return make_message(eof_video());\r
- \r
- token token(semaphore_);\r
- std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
+ virtual void run()\r
+ {\r
+ try\r
{\r
- av_free(frame);\r
- });\r
-\r
- int frame_finished = 0;\r
- THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
-\r
- // 1 packet <=> 1 frame.\r
- // If a decoder consumes less then the whole packet then something is wrong\r
- // that might be just harmless padding at the end, or a problem with the\r
- // AVParser or demuxer which puted more then one frame in a AVPacket.\r
-\r
- if(frame_finished == 0) \r
- return make_message(std::shared_ptr<AVFrame>());\r
-\r
- if(decoded_frame->repeat_pict > 0)\r
- CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
+ send(is_running_, true);\r
+ while(is_running_.value())\r
+ {\r
+ auto message = receive(source_);\r
+ auto packet = message->payload;\r
+ \r
+ if(!packet)\r
+ continue;\r
+\r
+ if(packet == loop_packet(index_))\r
+ {\r
+ send(target_, make_message(loop_video()));\r
+ continue;\r
+ }\r
+\r
+ if(packet == eof_packet(index_))\r
+ break;\r
+\r
+ token token(semaphore_);\r
+ std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
+ {\r
+ av_free(frame);\r
+ });\r
+\r
+ int frame_finished = 0;\r
+ THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
+\r
+ // 1 packet <=> 1 frame.\r
+ // If a decoder consumes less then the whole packet then something is wrong\r
+ // that might be just harmless padding at the end, or a problem with the\r
+ // AVParser or demuxer which puted more then one frame in a AVPacket.\r
+\r
+ if(frame_finished == 0) \r
+ continue;\r
+\r
+ if(decoded_frame->repeat_pict > 0)\r
+ CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
\r
- is_progressive_ = decoded_frame->interlaced_frame == 0;\r
+ is_progressive_ = decoded_frame->interlaced_frame == 0;\r
\r
- Concurrency::wait(10);\r
- return make_message(decoded_frame, message->token);\r
+ send(target_, make_message(decoded_frame, message->token));\r
+ Concurrency::wait(10);\r
+ }\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
+ \r
+ send(is_running_, false),\r
+ send(target_, make_message(eof_video()));\r
+\r
+ done();\r
}\r
\r
double fps() const\r