\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Sat Oct 22 04:24:45 2011\r
+/* at Sun Oct 23 22:39:56 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Sat Oct 22 04:24:45 2011\r
+/* at Sun Oct 23 22:39:56 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
\r
class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
{ \r
- Concurrency::bounded_buffer<std::shared_ptr<AVFrame>> video_frames_;\r
- Concurrency::bounded_buffer<std::shared_ptr<core::audio_buffer>> audio_buffers_;\r
- Concurrency::bounded_buffer<safe_ptr<core::basic_frame>> muxed_frames_;\r
+ Concurrency::bounded_buffer<safe_ptr<AVFrame>> video_frames_;\r
+ Concurrency::bounded_buffer<safe_ptr<core::audio_buffer>> audio_buffers_;\r
+ Concurrency::bounded_buffer<safe_ptr<core::basic_frame>> muxed_frames_;\r
\r
const core::video_format_desc format_desc_;\r
const size_t device_index_;\r
, device_index_(device_index)\r
, last_frame_(core::basic_frame::empty())\r
, length_(length)\r
- , muxer_(&video_frames_, &audio_buffers_, muxed_frames_, format_desc.fps, frame_factory, filter_str)\r
+ , muxer_(&video_frames_, &audio_buffers_, muxed_frames_, format_desc.fps, frame_factory)\r
, is_running_(true)\r
{\r
agent::start();\r
Concurrency::parallel_invoke(\r
[&]\r
{\r
- Concurrency::send<std::shared_ptr<AVFrame>>(video_frames_, av_frame); \r
+ Concurrency::send(video_frames_, av_frame); \r
},\r
[&]\r
{ \r
{\r
auto sample_frame_count = audio->GetSampleFrameCount();\r
auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
- Concurrency::send<std::shared_ptr<core::audio_buffer>>(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
+ Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
}\r
else\r
- Concurrency::send<std::shared_ptr<core::audio_buffer>>(audio_buffers_, ffmpeg::empty_audio()); \r
+ Concurrency::send(audio_buffers_, ffmpeg::empty_audio()); \r
});\r
}\r
\r
#include <core/consumer/frame_consumer.h>\r
#include <core/producer/frame_producer.h>\r
\r
-#include <boost/thread/recursive_mutex.hpp>\r
+#include <ppl.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (disable : 4244)\r
if(!mutex)\r
return 0;\r
\r
- auto my_mutex = reinterpret_cast<boost::recursive_mutex*>(*mutex);\r
+ auto my_mutex = reinterpret_cast<Concurrency::critical_section*>(*mutex);\r
\r
switch(op) \r
{ \r
case AV_LOCK_CREATE: \r
{ \r
- *mutex = new boost::recursive_mutex(); \r
+ *mutex = new Concurrency::critical_section(); \r
break; \r
} \r
case AV_LOCK_OBTAIN: \r
<Lib />\r
</ItemDefinitionGroup>\r
<ItemGroup>\r
+ <ClCompile Include="consumer\ffmpeg_consumer.cpp">\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">true</ExcludedFromBuild>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">true</ExcludedFromBuild>\r
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>\r
+ </ClCompile>\r
<ClCompile Include="ffmpeg.cpp">\r
<ShowIncludes Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">false</ShowIncludes>\r
</ClCompile>\r
</ClCompile>\r
</ItemGroup>\r
<ItemGroup>\r
+ <ClInclude Include="consumer\ffmpeg_consumer.h" />\r
<ClInclude Include="ffmpeg.h" />\r
<ClInclude Include="ffmpeg_error.h" />\r
<ClInclude Include="producer\audio\audio_decoder.h" />\r
<ClInclude Include="producer\audio\audio_resampler.h" />\r
- <ClInclude Include="producer\display_mode.h" />\r
<ClInclude Include="producer\ffmpeg_producer.h" />\r
<ClInclude Include="producer\filter\filter.h" />\r
<ClInclude Include="producer\filter\parallel_yadif.h" />\r
<Filter Include="source">\r
<UniqueIdentifier>{c9454245-e85a-45ba-960f-203b00d18454}</UniqueIdentifier>\r
</Filter>\r
+ <Filter Include="source\consumer">\r
+ <UniqueIdentifier>{4db7d9c3-08ea-4423-a303-f18737629268}</UniqueIdentifier>\r
+ </Filter>\r
<Filter Include="source\producer">\r
<UniqueIdentifier>{c5a94fd1-4552-4f6d-97cd-24e44e662e0f}</UniqueIdentifier>\r
</Filter>\r
<ClCompile Include="producer\audio\audio_decoder.cpp">\r
<Filter>source\producer\audio</Filter>\r
</ClCompile>\r
+ <ClCompile Include="consumer\ffmpeg_consumer.cpp">\r
+ <Filter>source\consumer</Filter>\r
+ </ClCompile>\r
<ClCompile Include="StdAfx.cpp" />\r
<ClCompile Include="ffmpeg.cpp">\r
<Filter>source</Filter>\r
<ClInclude Include="producer\audio\audio_decoder.h">\r
<Filter>source\producer\audio</Filter>\r
</ClInclude>\r
+ <ClInclude Include="consumer\ffmpeg_consumer.h">\r
+ <Filter>source\consumer</Filter>\r
+ </ClInclude>\r
<ClInclude Include="StdAfx.h" />\r
<ClInclude Include="ffmpeg_error.h">\r
<Filter>source</Filter>\r
<ClInclude Include="producer\input.h">\r
<Filter>source\producer</Filter>\r
</ClInclude>\r
- <ClInclude Include="producer\display_mode.h">\r
- <Filter>source\producer</Filter>\r
- </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
#include "../util.h"\r
#include "../../ffmpeg_error.h"\r
\r
-#include <common/concurrency/message.h>\r
#include <core/video_format.h>\r
\r
#include <tbb/cache_aligned_allocator.h>\r
#pragma warning (pop)\r
#endif\r
\r
-#include <agents.h>\r
+#include <connect.h>\r
#include <semaphore.h>\r
\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct audio_decoder::implementation : boost::noncopyable\r
+struct audio_decoder::implementation : public agent, boost::noncopyable\r
{ \r
int index_;\r
std::shared_ptr<AVCodecContext> codec_context_; \r
audio_resampler resampler_;\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
- \r
- safe_ptr<semaphore> semaphore_;\r
\r
- transformer<safe_ptr<AVPacket>, std::shared_ptr<core::audio_buffer>> transformer_; \r
+ overwrite_buffer<bool> is_running_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> source_;\r
+ ITarget<safe_ptr<core::audio_buffer>>& target_;\r
+ \r
public:\r
explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
: codec_context_(open_codec(context, AVMEDIA_TYPE_AUDIO, index_))\r
format_desc.audio_sample_rate, codec_context_->sample_rate,\r
AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
- , semaphore_(make_safe<semaphore>(32))\r
- , transformer_([this](const safe_ptr<AVPacket>& packet){return (*this)(packet);}, &target,\r
- [this](const safe_ptr<AVPacket>& packet){return packet->stream_index == index_;})\r
- { \r
- source.link_target(&transformer_);\r
- CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name; \r
+ , source_([this](const safe_ptr<AVPacket>& packet)\r
+ {\r
+ return packet->stream_index == index_;\r
+ })\r
+ , target_(target)\r
+ { \r
+ CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
+\r
+ Concurrency::connect(source, source_);\r
+\r
+ start();\r
}\r
- \r
+\r
~implementation()\r
{\r
- semaphore_->release();\r
+ send(is_running_, false);\r
+ agent::wait(this);\r
}\r
\r
- std::shared_ptr<core::audio_buffer> operator()(const safe_ptr<AVPacket>& packet)\r
- { \r
+ virtual void run()\r
+ {\r
try\r
{\r
- auto tok = make_safe<token>(semaphore_);\r
-\r
- if(packet == loop_packet(index_))\r
- {\r
- avcodec_flush_buffers(codec_context_.get());\r
- return loop_audio();\r
- }\r
+ send(is_running_, true);\r
+ while(is_running_.value())\r
+ { \r
+ auto packet = receive(source_);\r
+ \r
+ if(packet == loop_packet(index_))\r
+ {\r
+ send(target_, loop_audio());\r
+ continue;\r
+ }\r
\r
- if(packet == eof_packet(index_)) \r
- return eof_audio(); \r
+ if(packet == eof_packet(index_))\r
+ break;\r
\r
- auto result = safe_ptr<core::audio_buffer>(new core::audio_buffer(), [this, tok](core::audio_buffer* p)\r
- {\r
- delete p;\r
- });\r
+ auto result = std::make_shared<core::audio_buffer>();\r
\r
- while(packet->size > 0)\r
- {\r
- buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
- int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
+ while(packet->size > 0)\r
+ {\r
+ buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
+ int written_bytes = buffer1_.size() - FF_INPUT_BUFFER_PADDING_SIZE;\r
\r
- int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
+ int ret = THROW_ON_ERROR2(avcodec_decode_audio3(codec_context_.get(), reinterpret_cast<int16_t*>(buffer1_.data()), &written_bytes, packet.get()), "[audio_decoder]");\r
\r
- // There might be several frames in one packet.\r
- packet->size -= ret;\r
- packet->data += ret;\r
+ // There might be several frames in one packet.\r
+ packet->size -= ret;\r
+ packet->data += ret;\r
\r
- buffer1_.resize(written_bytes);\r
+ buffer1_.resize(written_bytes);\r
\r
- buffer1_ = resampler_.resample(std::move(buffer1_));\r
+ buffer1_ = resampler_.resample(std::move(buffer1_));\r
\r
- const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
- const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
+ const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
+ const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
\r
- result->insert(result->end(), samples, samples + n_samples);\r
+ send(target_, make_safe<core::audio_buffer>(samples, samples + n_samples));\r
+ }\r
+ Concurrency::wait(5);\r
}\r
- return result;\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- return eof_audio();\r
}\r
+\r
+ send(is_running_, false);\r
+ send(target_, eof_audio());\r
+\r
+ done();\r
}\r
};\r
\r
-audio_decoder::audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc)\r
+audio_decoder::audio_decoder(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc)\r
: impl_(new implementation(source, target, context, format_desc))\r
{\r
}\r
return 0;\r
}\r
\r
-\r
}}
\ No newline at end of file
public:\r
\r
typedef Concurrency::ISource<safe_ptr<AVPacket>>& source_t;\r
- typedef Concurrency::ITarget<std::shared_ptr<core::audio_buffer>>& target_t;\r
+ typedef Concurrency::ITarget<safe_ptr<core::audio_buffer>>& target_t;\r
\r
explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
\r
\r
buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
\r
- CASPAR_LOG(warning) << L"Resampling: " <<\r
- L" sample_rate: " << input_channels <<\r
- L" audio_channels: " << input_channels <<\r
- L" sample_fmt: " << input_sample_format;\r
+ CASPAR_LOG(warning) << L"Resampling." <<\r
+ L" sample_rate:" << input_channels <<\r
+ L" audio_channels:" << input_channels <<\r
+ L" sample_fmt:" << input_sample_format;\r
\r
CASPAR_VERIFY(resampler, caspar_exception());\r
\r
\r
#include <agents.h>\r
#include <agents_extras.h>\r
+#include <connect.h>\r
#include <concrt.h>\r
#include <ppl.h>\r
\r
const bool loop_;\r
const size_t length_;\r
\r
- unbounded_buffer<safe_ptr<AVPacket>> packets_;\r
- unbounded_buffer<std::shared_ptr<AVFrame>> video_;\r
- unbounded_buffer<std::shared_ptr<core::audio_buffer>> audio_;\r
call<safe_ptr<AVPacket>> throw_away_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> packets_;\r
+ unbounded_buffer<safe_ptr<AVFrame>> video_;\r
+ unbounded_buffer<safe_ptr<core::audio_buffer>> audio_;\r
bounded_buffer<safe_ptr<core::basic_frame>> frames_;\r
\r
const safe_ptr<diagnostics::graph> graph_;\r
\r
input input_; \r
- std::shared_ptr<audio_decoder> audio_decoder_; \r
std::shared_ptr<video_decoder> video_decoder_;\r
+ std::shared_ptr<audio_decoder> audio_decoder_; \r
std::unique_ptr<frame_muxer2> muxer_;\r
\r
safe_ptr<core::basic_frame> last_frame_;\r
CASPAR_LOG(warning) << "Failed to open audio-stream. Running without audio."; \r
}\r
\r
- packets_.link_target(&throw_away_);\r
+ Concurrency::connect(packets_, throw_away_);\r
\r
CASPAR_VERIFY(video_decoder_ || audio_decoder_, ffmpeg_error());\r
\r
- muxer_.reset(new frame_muxer2(video_source, audio_source, frames_, video_decoder_ ? video_decoder_->fps() : frame_factory->get_video_format_desc().fps, frame_factory, filter));\r
+ muxer_.reset(new frame_muxer2(video_source, audio_source, frames_, video_decoder_ ? video_decoder_->fps() : frame_factory->get_video_format_desc().fps, frame_factory));\r
\r
graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f)); \r
graph_->start();\r
\r
try\r
{ \r
- frame = last_frame_ = Concurrency::receive(frames_, 5);\r
+ frame = last_frame_ = Concurrency::receive(frames_, 10);\r
graph_->update_text(narrow(print()));\r
}\r
catch(operation_timed_out&)\r
mutable single_assignment<safe_ptr<filter>> filter_;\r
const safe_ptr<core::frame_factory> frame_factory_;\r
\r
- call<std::shared_ptr<AVFrame>> push_video_;\r
- call<std::shared_ptr<core::audio_buffer>> push_audio_;\r
+ call<safe_ptr<AVFrame>> push_video_;\r
+ call<safe_ptr<core::audio_buffer>> push_audio_;\r
\r
unbounded_buffer<safe_ptr<AVFrame>> video_;\r
unbounded_buffer<safe_ptr<core::audio_buffer>> audio_;\r
done();\r
}\r
\r
- void push_video(const std::shared_ptr<AVFrame>& video_frame)\r
+ void push_video(const safe_ptr<AVFrame>& video_frame)\r
{ \r
- if(!video_frame)\r
- return;\r
-\r
if(video_frame == eof_video() || video_frame == empty_video())\r
{\r
- send(video_, make_safe_ptr(video_frame));\r
+ send(video_, video_frame);\r
return;\r
}\r
\r
}\r
}\r
\r
- void push_audio(const std::shared_ptr<core::audio_buffer>& audio_samples)\r
+ void push_audio(const safe_ptr<core::audio_buffer>& audio_samples)\r
{\r
- if(!audio_samples)\r
- return;\r
-\r
if(audio_samples == eof_audio() || audio_samples == empty_audio())\r
{\r
- send(audio_, make_safe_ptr(audio_samples));\r
+ send(audio_, audio_samples);\r
return;\r
}\r
\r
{\r
public:\r
\r
- typedef Concurrency::ISource<std::shared_ptr<AVFrame>> video_source_t;\r
- typedef Concurrency::ISource<std::shared_ptr<core::audio_buffer>> audio_source_t;\r
+ typedef Concurrency::ISource<safe_ptr<AVFrame>> video_source_t;\r
+ typedef Concurrency::ISource<safe_ptr<core::audio_buffer>> audio_source_t;\r
typedef Concurrency::ITarget<safe_ptr<core::basic_frame>> target_t;\r
\r
frame_muxer2(video_source_t* video_source,\r
target_t& target,\r
double in_fps, \r
const safe_ptr<core::frame_factory>& frame_factory,\r
- const std::wstring& filter);\r
+ const std::wstring& filter = L"");\r
\r
int64_t calc_nb_frames(int64_t nb_frames) const;\r
private:\r
\r
#include <core/video_format.h>\r
\r
-#include <common/concurrency/message.h>\r
#include <common/diagnostics/graph.h>\r
#include <common/exception/exceptions.h>\r
#include <common/exception/win32_exception.h>\r
\r
tbb::atomic<size_t> nb_frames_;\r
tbb::atomic<size_t> nb_loops_; \r
- \r
- overwrite_buffer<bool> is_running_;\r
+ tbb::atomic<size_t> packets_count_;\r
+ tbb::atomic<size_t> packets_size_;\r
\r
- safe_ptr<semaphore> semaphore_;\r
+ bool stop_;\r
\r
public:\r
explicit implementation(input::target_t& target,\r
, start_(start)\r
, length_(length)\r
, frame_number_(0)\r
- , semaphore_(make_safe<semaphore>(32))\r
+ , stop_(false)\r
{ \r
+ packets_count_ = 0;\r
+ packets_size_ = 0;\r
nb_frames_ = 0;\r
nb_loops_ = 0;\r
\r
\r
graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
}\r
-\r
- ~implementation()\r
- {\r
- send(is_running_, false);\r
- semaphore_->release();\r
- semaphore_->release();\r
- agent::wait(this);\r
- CASPAR_LOG(info) << print() << " Stopped.";\r
- }\r
\r
void stop()\r
{\r
- send(is_running_, false);\r
- semaphore_->release();\r
- semaphore_->release();\r
- CASPAR_LOG(info) << print() << " Stopping.";\r
+ stop_ = true;\r
+ agent::wait(this);\r
}\r
\r
virtual void run()\r
{\r
try\r
{\r
- send(is_running_, true);\r
- while(is_running_.value())\r
+ while(!stop_)\r
{\r
auto packet = read_next_packet();\r
if(!packet)\r
break;\r
\r
Concurrency::asend(target_, make_safe_ptr(packet));\r
- Concurrency::wait(2);\r
+ Concurrency::wait(20);\r
}\r
}\r
catch(...)\r
auto size = packet->size;\r
auto data = packet->data; \r
\r
- packet = safe_ptr<AVPacket>(packet.get(), [this, packet, size, data](AVPacket*)\r
+ packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
{\r
packet->size = size;\r
packet->data = data;\r
- semaphore_->release();\r
+ --packets_count_;\r
});\r
- semaphore_->acquire();\r
- \r
+\r
+ ++packets_count_;\r
+ \r
return packet;\r
}\r
\r
fix_meta_data(*context);\r
return context;\r
}\r
+//\r
+//void av_dup_frame(AVFrame* frame)\r
+//{\r
+// AVFrame* new_frame = avcodec_alloc_frame();\r
+//\r
+//\r
+// const uint8_t *src_data[4] = {0};\r
+// memcpy(const_cast<uint8_t**>(&src_data[0]), frame->data, 4);\r
+// const int src_linesizes[4] = {0};\r
+// memcpy(const_cast<int*>(&src_linesizes[0]), frame->linesize, 4);\r
+//\r
+// av_image_alloc(new_frame->data, new_frame->linesize, new_frame->width, new_frame->height, frame->format, 16);\r
+//\r
+// av_image_copy(new_frame->data, new_frame->linesize, src_data, src_linesizes, frame->format, new_frame->width, new_frame->height);\r
+//\r
+// frame =\r
+//}\r
\r
}}
\ No newline at end of file
safe_ptr<AVCodecContext> open_codec(AVFormatContext& context, enum AVMediaType type, int& index);\r
safe_ptr<AVFormatContext> open_input(const std::wstring& filename);\r
\r
-\r
-//safe_ptr<AVFrame> copy_av_frame(safe_ptr<AVFrame> av_frame)\r
-//{\r
-// const uint8_t *src_data[4] = {0};\r
-// memcpy(const_cast<uint8_t**>(&src_data[0]), av_frame->data, 4);\r
-// const int src_linesizes[4] = {0};\r
-// memcpy(const_cast<int*>(&src_linesizes[0]), av_frame->linesize, 4);\r
-//\r
-// auto av_frame2 = get_av_frame();\r
-// av_image_alloc(av_frame2->data, av_frame2->linesize, av_frame2->width, av_frame2->height, PIX_FMT_BGRA, 16);\r
-// av_frame = safe_ptr<AVFrame>(av_frame2.get(), [=](AVFrame*)\r
-// {\r
-// av_freep(&av_frame2->data[0]);\r
-// });\r
-//\r
-// av_image_copy(av_frame2->data, av_frame2->linesize, src_data, src_linesizes, PIX_FMT_BGRA, av_frame2->width, av_frame2->height);\r
-//\r
-// return av_frame;\r
-//}\r
+//void av_dup_frame(AVFrame* frame);\r
\r
}}
\ No newline at end of file
#include "../../ffmpeg_error.h"\r
\r
#include <core/producer/frame/basic_frame.h>\r
-#include <common/concurrency/message.h>\r
+#include <common/memory/memcpy.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
-#include <agents.h>\r
+#include <connect.h>\r
#include <semaphore.h>\r
\r
+#include <tbb/scalable_allocator.h>\r
+\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct video_decoder::implementation : boost::noncopyable\r
+struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
{ \r
int index_;\r
- safe_ptr<AVCodecContext> codec_context_; \r
- const double fps_;\r
- const int64_t nb_frames_;\r
- const size_t width_;\r
- const size_t height_;\r
+ std::shared_ptr<AVCodecContext> codec_context_;\r
+ \r
+ double fps_;\r
+ int64_t nb_frames_;\r
\r
+ size_t width_;\r
+ size_t height_;\r
bool is_progressive_;\r
- \r
- safe_ptr<semaphore> semaphore_;\r
-\r
- transformer<safe_ptr<AVPacket>, std::shared_ptr<AVFrame>> transformer_;\r
-\r
+ \r
+ overwrite_buffer<bool> is_running_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> source_;\r
+ ITarget<safe_ptr<AVFrame>>& target_;\r
+ \r
public:\r
explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
: codec_context_(open_codec(context, AVMEDIA_TYPE_VIDEO, index_))\r
, width_(codec_context_->width)\r
, height_(codec_context_->height)\r
, is_progressive_(true)\r
- , semaphore_(make_safe<semaphore>(1))\r
- , transformer_([this](const safe_ptr<AVPacket>& packet){return (*this)(packet);}, &target,\r
- [this](const safe_ptr<AVPacket>& packet){return packet->stream_index == index_;})\r
+ , source_([this](const safe_ptr<AVPacket>& packet)\r
+ {\r
+ return packet->stream_index == index_;\r
+ })\r
+ , target_(target)\r
{ \r
- source.link_target(&transformer_);\r
CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
+ \r
+ CASPAR_VERIFY(width_ > 0, ffmpeg_error());\r
+ CASPAR_VERIFY(height_ > 0, ffmpeg_error());\r
+\r
+ Concurrency::connect(source, source_);\r
+\r
+ start();\r
}\r
\r
~implementation()\r
{\r
- semaphore_->release();\r
+ send(is_running_, false);\r
+ agent::wait(this);\r
}\r
- \r
- std::shared_ptr<AVFrame> operator()(const safe_ptr<AVPacket>& packet)\r
- { \r
+\r
+ virtual void run()\r
+ {\r
try\r
{\r
- auto tok = make_safe<token>(semaphore_);\r
-\r
- if(packet == loop_packet(index_))\r
+ send(is_running_, true);\r
+ while(is_running_.value())\r
{\r
- avcodec_flush_buffers(codec_context_.get());\r
- return loop_video();\r
- }\r
+ auto packet = receive(source_);\r
+ \r
+ if(packet == loop_packet(index_))\r
+ {\r
+ send(target_, loop_video());\r
+ continue;\r
+ }\r
\r
- if(packet == eof_packet(index_)) \r
- return eof_video(); \r
+ if(packet == eof_packet(index_))\r
+ break;\r
\r
- CASPAR_ASSERT(packet->size > 0);\r
+ std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), av_free);\r
\r
- std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, tok](AVFrame* frame)\r
- {\r
- av_free(frame);\r
- });\r
-\r
- int frame_finished = 0;\r
- THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
+ int frame_finished = 0;\r
+ THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
\r
- // 1 packet <=> 1 frame.\r
- // If a decoder consumes less then the whole packet then something is wrong\r
- // that might be just harmless padding at the end, or a problem with the\r
- // AVParser or demuxer which puted more then one frame in a AVPacket.\r
+ // 1 packet <=> 1 frame.\r
+ // If a decoder consumes less then the whole packet then something is wrong\r
+ // that might be just harmless padding at the end, or a problem with the\r
+ // AVParser or demuxer which puted more then one frame in a AVPacket.\r
\r
- if(frame_finished == 0) \r
- return nullptr;\r
-\r
- if(decoded_frame->repeat_pict > 0)\r
- CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
+ if(frame_finished == 0) \r
+ continue;\r
+ \r
+ if(decoded_frame->repeat_pict > 0)\r
+ CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
\r
- is_progressive_ = decoded_frame->interlaced_frame == 0;\r
+ is_progressive_ = decoded_frame->interlaced_frame == 0;\r
\r
- return decoded_frame;\r
+ // Need to dupliace frame data since avcodec_decode_video2 reuses it.\r
+ send(target_, dup_frame(make_safe_ptr(decoded_frame)));\r
+ Concurrency::wait(10);\r
+ }\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- return eof_video();\r
}\r
+ \r
+ send(is_running_, false),\r
+ send(target_, eof_video());\r
+\r
+ done();\r
+ }\r
+\r
+ safe_ptr<AVFrame> dup_frame(const safe_ptr<AVFrame>& frame)\r
+ {\r
+ std::array<uint8_t*, 4> data;\r
+ parallel_for(0, 4, [&](int n)\r
+ {\r
+ data[n] = frame->data[n];\r
+ frame->data[n] = reinterpret_cast<uint8_t*>(scalable_aligned_malloc(frame->linesize[n]*frame->height, 32));\r
+ memcpy(frame->data[n], data[n], frame->linesize[n]*frame->height);\r
+ });\r
+\r
+ return safe_ptr<AVFrame>(frame.get(), [frame, data](AVFrame*)\r
+ {\r
+ for(int n = 0; n < 4; ++n)\r
+ {\r
+ scalable_aligned_free(frame->data[n]);\r
+ frame->data[n] = data[n];\r
+ }\r
+ });\r
}\r
\r
double fps() const\r
}\r
};\r
\r
-video_decoder::video_decoder(source_t& source, target_t& target, AVFormatContext& context) \r
+video_decoder::video_decoder(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
: impl_(new implementation(source, target, context))\r
{\r
}\r
public:\r
\r
typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
- typedef Concurrency::ITarget<std::shared_ptr<AVFrame>> target_t;\r
+ typedef Concurrency::ITarget<safe_ptr<AVFrame>> target_t;\r
\r
explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context); \r
\r
<diagnostics>\r
<graphs>true</graphs>\r
</diagnostics>\r
- <consumers>\r
- <buffer-depth>3</buffer-depth>\r
- </consumers>\r
<mixers>\r
<blend-modes>false</blend-modes>\r
</mixers>\r
<producers>\r
- <buffer-depth>1</buffer-depth>\r
<auto-transcode>true</auto-transcode>\r
<template-hosts>\r
<template-host>\r
<width>1280</width>\r
<height>720</height>\r
</template-host>\r
+ <template-host>\r
+ <video-mode>PAL</video-mode>\r
+ <filename>cg.fth.18</filename>\r
+ <width>1280</width>\r
+ <height>720</height>\r
+ </template-host>\r
</template-hosts>\r
</producers>\r
<channels>\r
\r
#include <core/mixer/gpu/ogl_device.h>\r
\r
-#include <agents.h>\r
+#include <concrt.h>\r
\r
#include <boost/property_tree/detail/file_parser_error.hpp>\r
\r
inc_prec(){timeBeginPeriod(1);}\r
~inc_prec(){timeEndPeriod(1);}\r
} inc_prec; \r
+\r
+ //Concurrency::Scheduler::SetDefaultSchedulerPolicy(Concurrency::SchedulerPolicy(1, Concurrency::SchedulerKind, Concurrency::UmsThreadDefault));\r
\r
try \r
{\r