namespace caspar { namespace core {\r
\r
ogl_device::ogl_device() \r
- : executor_(L"ogl_device")\r
+ : executor_(new executor(L"ogl_device"))\r
, pattern_(nullptr)\r
, attached_texture_(0)\r
, active_shader_(0)\r
auto& pool = device_pools_[stride-1][((width << 16) & 0xFFFF0000) | (height & 0x0000FFFF)];\r
std::shared_ptr<device_buffer> buffer;\r
if(!pool->items.try_pop(buffer)) \r
- buffer = executor_.invoke([&]{return allocate_device_buffer(width, height, stride);}, high_priority); \r
+ buffer = executor_->invoke([&]{return allocate_device_buffer(width, height, stride);}, high_priority); \r
\r
//++pool->usage_count;\r
\r
auto& pool = host_pools_[usage][size];\r
std::shared_ptr<host_buffer> buffer;\r
if(!pool->items.try_pop(buffer)) \r
- buffer = executor_.invoke([=]{return allocate_host_buffer(size, usage);}, high_priority); \r
+ buffer = executor_->invoke([=]{return allocate_host_buffer(size, usage);}, high_priority); \r
\r
//++pool->usage_count;\r
\r
+ auto exe = executor_;\r
return safe_ptr<host_buffer>(buffer.get(), [=](host_buffer*) mutable\r
{\r
- executor_.begin_invoke([=]() mutable\r
+ exe->begin_invoke([=]() mutable\r
{ \r
if(usage == host_buffer::write_only)\r
buffer->map();\r
\r
void ogl_device::yield()\r
{\r
- executor_.yield();\r
+ executor_->yield();\r
}\r
\r
boost::unique_future<void> ogl_device::gc()\r
\r
unsigned int fbo_;\r
\r
- executor executor_;\r
+ safe_ptr<executor> executor_;\r
\r
public: \r
ogl_device();\r
template<typename Func>\r
auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
{ \r
- return executor_.begin_invoke(std::forward<Func>(func), priority);\r
+ return executor_->begin_invoke(std::forward<Func>(func), priority);\r
}\r
\r
template<typename Func>\r
auto invoke(Func&& func, task_priority priority = normal_priority) -> decltype(func())\r
{\r
- return executor_.invoke(std::forward<Func>(func), priority);\r
+ return executor_->invoke(std::forward<Func>(func), priority);\r
}\r
\r
safe_ptr<device_buffer> create_device_buffer(size_t width, size_t height, size_t stride);\r
\r
class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
{ \r
- Concurrency::bounded_buffer<std::shared_ptr<AVFrame>> video_frames_;\r
- Concurrency::bounded_buffer<std::shared_ptr<core::audio_buffer>> audio_buffers_;\r
- Concurrency::bounded_buffer<safe_ptr<core::basic_frame>> muxed_frames_;\r
+ Concurrency::bounded_buffer<ffmpeg::video_message_t> video_frames_;\r
+ Concurrency::bounded_buffer<ffmpeg::audio_message_t> audio_buffers_;\r
+ Concurrency::bounded_buffer<ffmpeg::frame_message_t> muxed_frames_;\r
\r
const core::video_format_desc format_desc_;\r
const size_t device_index_;\r
\r
mutable Concurrency::single_assignment<std::wstring> print_;\r
\r
+ safe_ptr<Concurrency::semaphore> semaphore_;\r
+\r
volatile bool is_running_;\r
public:\r
\r
, filter_(filter_str)\r
, muxer_(&video_frames_, &audio_buffers_, muxed_frames_, ffmpeg::double_rate(filter_str) ? format_desc.fps * 2.0 : format_desc.fps, frame_factory)\r
, is_running_(true)\r
+ , semaphore_(make_safe<Concurrency::semaphore>(3))\r
{\r
agent::start();\r
}\r
\r
try\r
{\r
- last_frame_ = frame = safe_ptr<core::basic_frame>(Concurrency::receive(muxed_frames_));\r
+ auto message = Concurrency::receive(muxed_frames_);\r
+ last_frame_ = frame = make_safe_ptr(message->payload);\r
}\r
catch(Concurrency::operation_timed_out&)\r
{ \r
auto frame = filter_.poll();\r
if(!frame)\r
break;\r
- Concurrency::send(video_frames_, frame);\r
+ Concurrency::send(video_frames_, ffmpeg::make_message(frame, std::make_shared<ffmpeg::token>(semaphore_)));\r
}\r
},\r
[&]\r
{\r
auto sample_frame_count = audio->GetSampleFrameCount();\r
auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
- Concurrency::send(audio_buffers_, std::make_shared<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
+ Concurrency::send(audio_buffers_, ffmpeg::make_message(std::make_shared<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), std::make_shared<ffmpeg::token>(semaphore_)));\r
}\r
else\r
- Concurrency::send(audio_buffers_, ffmpeg::empty_audio()); \r
+ Concurrency::send(audio_buffers_, ffmpeg::make_message(ffmpeg::empty_audio(), std::make_shared<ffmpeg::token>(semaphore_))); \r
});\r
}\r
\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- Concurrency::transformer<std::shared_ptr<AVPacket>, std::shared_ptr<core::audio_buffer>> transformer_;\r
+ Concurrency::transformer<packet_message_t, audio_message_t> transformer_;\r
\r
public:\r
explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
format_desc.audio_sample_rate, codec_context_->sample_rate,\r
AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
- , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const std::shared_ptr<AVPacket>& packet)\r
+ , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
{\r
- return packet && packet->stream_index == index_;\r
+ return message->payload && message->payload->stream_index == index_;\r
})\r
{ \r
CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
Concurrency::connect(source, transformer_);\r
}\r
\r
- std::shared_ptr<core::audio_buffer> decode(const std::shared_ptr<AVPacket>& packet)\r
+ audio_message_t decode(const packet_message_t& message)\r
{ \r
+ auto packet = message->payload;\r
+\r
if(!packet)\r
- return nullptr;\r
+ return make_message(std::shared_ptr<core::audio_buffer>());\r
\r
if(packet == loop_packet(index_))\r
- return loop_audio();\r
+ return make_message(loop_audio());\r
\r
if(packet == eof_packet(index_))\r
- return eof_audio();\r
+ return make_message(eof_audio());\r
\r
- auto result = make_safe<core::audio_buffer>();\r
+ auto result = std::make_shared<core::audio_buffer>();\r
\r
while(packet->size > 0)\r
{\r
result->insert(result->end(), samples, samples + n_samples);\r
}\r
\r
- return result;\r
+ return make_message(result, message->token);\r
}\r
};\r
\r
*/\r
#pragma once\r
\r
+#include "../util.h"\r
+\r
#include <core/mixer/audio/audio_mixer.h>\r
\r
#include <common/memory/safe_ptr.h>\r
{\r
public:\r
\r
- typedef Concurrency::ISource<std::shared_ptr<AVPacket>>& source_t;\r
- typedef Concurrency::ITarget<std::shared_ptr<core::audio_buffer>>& target_t;\r
+ typedef Concurrency::ISource<packet_message_t>& source_t;\r
+ typedef Concurrency::ITarget<audio_message_t>& target_t;\r
\r
explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
\r
const bool loop_;\r
const size_t length_;\r
\r
- Concurrency::unbounded_buffer<std::shared_ptr<AVPacket>> packets_;\r
- Concurrency::unbounded_buffer<std::shared_ptr<AVFrame>> video_;\r
- Concurrency::unbounded_buffer<std::shared_ptr<core::audio_buffer>> audio_;\r
- Concurrency::bounded_buffer<safe_ptr<core::basic_frame>> frames_;\r
+ Concurrency::unbounded_buffer<packet_message_t> packets_;\r
+ Concurrency::unbounded_buffer<video_message_t> video_;\r
+ Concurrency::unbounded_buffer<audio_message_t> audio_;\r
+ Concurrency::bounded_buffer<frame_message_t> frames_;\r
+ Concurrency::call<packet_message_t> throw_away_;\r
\r
const safe_ptr<diagnostics::graph> graph_;\r
\r
- input input_; \r
- std::shared_ptr<video_decoder> video_decoder_;\r
- std::shared_ptr<audio_decoder> audio_decoder_; \r
- Concurrency::call<std::shared_ptr<AVPacket>> throw_away_;\r
- std::unique_ptr<frame_muxer2> muxer_;\r
+ input input_; \r
+ std::shared_ptr<video_decoder> video_decoder_;\r
+ std::shared_ptr<audio_decoder> audio_decoder_; \r
+ std::unique_ptr<frame_muxer2> muxer_;\r
\r
safe_ptr<core::basic_frame> last_frame_;\r
\r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
- , throw_away_([](const std::shared_ptr<AVPacket>&){})\r
+ , throw_away_([](const packet_message_t&){})\r
, frames_(2)\r
, graph_(diagnostics::create_graph("", false))\r
, input_(packets_, graph_, filename_, loop, start, length)\r
~ffmpeg_producer()\r
{\r
input_.stop(); \r
- while(Concurrency::receive(frames_) != core::basic_frame::eof())\r
+ while(Concurrency::receive(frames_)->payload != core::basic_frame::eof())\r
{\r
}\r
}\r
\r
try\r
{ \r
- frame = last_frame_ = safe_ptr<core::basic_frame>(Concurrency::receive(frames_, 10));\r
+ auto message = Concurrency::receive(frames_, 10);\r
+ frame = last_frame_ = make_safe_ptr(message->payload);\r
graph_->update_text(narrow(print()));\r
}\r
catch(Concurrency::operation_timed_out&)\r
\r
struct frame_muxer2::implementation : boost::noncopyable\r
{ \r
+ typedef std::shared_ptr<message<core::write_frame>> write_frame_message_t;\r
+\r
display_mode::type display_mode_;\r
const double in_fps_;\r
const video_format_desc format_desc_;\r
bool auto_transcode_;\r
\r
filter filter_;\r
- safe_ptr<core::frame_factory> frame_factory_;\r
+ const safe_ptr<core::frame_factory> frame_factory_;\r
\r
- Concurrency::call<std::shared_ptr<AVFrame>> push_video_;\r
- Concurrency::call<std::shared_ptr<core::audio_buffer>> push_audio_;\r
+ Concurrency::call<video_message_t> push_video_;\r
+ Concurrency::call<audio_message_t> push_audio_;\r
\r
- Concurrency::transformer<safe_ptr<AVFrame>, std::shared_ptr<core::write_frame>> video_;\r
- Concurrency::unbounded_buffer<std::shared_ptr<core::audio_buffer>> audio_;\r
+ Concurrency::transformer<video_message_t, write_frame_message_t> video_;\r
+ Concurrency::unbounded_buffer<audio_message_t> audio_;\r
\r
- typedef std::tuple<std::shared_ptr<core::write_frame>, std::shared_ptr<core::audio_buffer>> join_element_t;\r
+ typedef std::tuple<write_frame_message_t, audio_message_t> join_element_t;\r
\r
- Concurrency::transformer<join_element_t, safe_ptr<core::basic_frame>> merge_;\r
+ Concurrency::transformer<join_element_t, frame_message_t> merge_;\r
safe_ptr<Concurrency::ISource<join_element_t>> join_;\r
\r
core::audio_buffer audio_data_;\r
- std::queue<safe_ptr<AVFrame>> video_frames_;\r
+ std::queue<video_message_t> video_frames_;\r
\r
implementation(frame_muxer2::video_source_t* video_source,\r
frame_muxer2::audio_source_t* audio_source,\r
, format_desc_(frame_factory->get_video_format_desc())\r
, auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
, frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
- , video_(std::bind(&make_write_frame, this, std::placeholders::_1, frame_factory, 0))\r
+ , video_(std::bind(&implementation::make_write_frame, this, std::placeholders::_1))\r
, push_video_(std::bind(&implementation::push_video, this, std::placeholders::_1))\r
, push_audio_(std::bind(&implementation::push_audio, this, std::placeholders::_1))\r
, merge_(std::bind(&implementation::merge, this, std::placeholders::_1), &target)\r
join_->link_target(&merge_);\r
}\r
\r
- safe_ptr<core::basic_frame> merge(const join_element_t& element)\r
+ frame_message_t merge(const join_element_t& element)\r
{\r
- //if(std::get<0>(element) == eof_video() || std::get<1>(element) == eof_audio())\r
- // return core::basic_frame::eof();\r
- auto frame = std::get<0>(element);\r
- frame->audio_data() = std::move(*std::get<1>(element));\r
- return make_safe_ptr(frame);\r
+ if(!std::get<0>(element)->payload || !std::get<1>(element)->payload)\r
+ return make_message(std::shared_ptr<core::basic_frame>(core::basic_frame::eof()));\r
+\r
+ auto& frame = std::get<0>(element)->payload;\r
+ frame->audio_data() = std::move(*std::get<1>(element)->payload);\r
+ return make_message(std::shared_ptr<core::basic_frame>(frame), std::get<0>(element)->token);\r
+ }\r
+\r
+ write_frame_message_t make_write_frame(const video_message_t& message)\r
+ {\r
+ if(!message->payload)\r
+ return make_message(std::shared_ptr<core::write_frame>(), message->token);\r
+\r
+ auto frame = ffmpeg::make_write_frame(this, make_safe_ptr(message->payload), frame_factory_, 0);\r
+ return make_message(std::shared_ptr<core::write_frame>(frame), message->token);\r
}\r
\r
- void push_video(const std::shared_ptr<AVFrame>& video_frame)\r
+ void push_video(const video_message_t& message)\r
{ \r
+ auto video_frame = message->payload;\r
+\r
if(!video_frame)\r
return;\r
+ \r
+ if(video_frame == eof_video())\r
+ {\r
+ Concurrency::send(video_, make_message(std::shared_ptr<AVFrame>()));\r
+ return;\r
+ }\r
\r
if(video_frame == loop_video()) \r
return; \r
\r
if(video_frame == empty_video())\r
{\r
- Concurrency::send(video_, make_safe_ptr(empty_video()));\r
+ Concurrency::send(video_, make_message(empty_video(), message->token));\r
return;\r
}\r
\r
{ \r
av_frame->format = format;\r
\r
- video_frames_.push(av_frame);\r
+ video_frames_.push(make_message(std::shared_ptr<AVFrame>(av_frame), std::move(message->token)));\r
\r
switch(display_mode_)\r
{\r
{\r
Concurrency::send(video_, video_frames_.front());\r
video_frames_.pop();\r
+\r
break;\r
}\r
case display_mode::duplicate: \r
video_frames_.pop();\r
Concurrency::send(video_, video_frames_.front());\r
video_frames_.pop();\r
+\r
break;\r
}\r
case display_mode::half: \r
}\r
}\r
\r
- void push_audio(const std::shared_ptr<core::audio_buffer>& audio_samples)\r
+ void push_audio(const audio_message_t& message)\r
{\r
+ auto audio_samples = message->payload;\r
+\r
if(!audio_samples)\r
return;\r
\r
+ if(audio_samples == eof_audio())\r
+ {\r
+ Concurrency::send(audio_, make_message(std::shared_ptr<core::audio_buffer>()));\r
+ return;\r
+ }\r
+\r
if(audio_samples == loop_audio()) \r
return; \r
\r
if(audio_samples == empty_audio()) \r
- Concurrency::send(audio_, std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0)); \r
+ Concurrency::send(audio_, make_message(std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0), message->token)); \r
\r
audio_data_.insert(audio_data_.end(), audio_samples->begin(), audio_samples->end());\r
\r
auto begin = audio_data_.begin(); \r
auto end = begin + format_desc_.audio_samples_per_frame;\r
\r
- Concurrency::send(audio_, std::make_shared<core::audio_buffer>(begin, end));\r
+ Concurrency::send(audio_, make_message(std::make_shared<core::audio_buffer>(begin, end), message->token));\r
audio_data_.erase(begin, end);\r
}\r
}\r
#pragma once\r
\r
+#include "util.h"\r
+\r
#include <common/memory/safe_ptr.h>\r
\r
#include <core/mixer/audio/audio_mixer.h>\r
{\r
public:\r
\r
- typedef Concurrency::ISource<std::shared_ptr<AVFrame>> video_source_t;\r
- typedef Concurrency::ISource<std::shared_ptr<core::audio_buffer>> audio_source_t;\r
- typedef Concurrency::ITarget<safe_ptr<core::basic_frame>> target_t;\r
-\r
+ typedef Concurrency::ISource<video_message_t> video_source_t;\r
+ typedef Concurrency::ISource<audio_message_t> audio_source_t;\r
+ typedef Concurrency::ITarget<frame_message_t> target_t;\r
+ \r
frame_muxer2(video_source_t* video_source,\r
audio_source_t* audio_source, \r
target_t& target,\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-static const size_t MAX_BUFFER_COUNT = 32;\r
+static const size_t MAX_TOKENS = 32;\r
\r
struct input::implementation : public Concurrency::agent, boost::noncopyable\r
{\r
tbb::atomic<size_t> nb_frames_;\r
tbb::atomic<size_t> nb_loops_; \r
tbb::atomic<size_t> packets_count_;\r
+ tbb::atomic<size_t> packets_size_;\r
\r
bool stop_;\r
+\r
+ safe_ptr<Concurrency::semaphore> semaphore_;\r
\r
public:\r
explicit implementation(input::target_t& target,\r
, length_(length)\r
, frame_number_(0)\r
, stop_(false)\r
+ , semaphore_(make_safe<Concurrency::semaphore>(MAX_TOKENS))\r
{ \r
packets_count_ = 0;\r
+ packets_size_ = 0;\r
nb_frames_ = 0;\r
nb_loops_ = 0;\r
\r
seek_frame(start_);\r
\r
graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
- graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f));\r
- graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
}\r
-\r
- ~implementation()\r
+ \r
+ void stop()\r
{\r
+ stop_ = true;\r
+ for(size_t n = 0; n < format_context_->nb_streams+1; ++n)\r
+ semaphore_->release();\r
agent::wait(this);\r
}\r
\r
{\r
try\r
{\r
- while(!stop_ && read_next_packet())\r
+ while(!stop_)\r
{\r
+ auto packet = read_next_packet();\r
+ if(!packet)\r
+ break;\r
+\r
+ Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
+ Concurrency::wait(0);\r
+\r
+ //std::vector<std::shared_ptr<AVPacket>> buffer;\r
+\r
+ //while(buffer.size() < 100 && !stop_)\r
+ //{\r
+ // Concurrency::scoped_oversubcription_token oversubscribe;\r
+ // auto packet = read_next_packet();\r
+ // if(!packet)\r
+ // stop_ = true;\r
+ // else\r
+ // buffer.push_back(packet);\r
+ //}\r
+ // \r
+ //std::stable_partition(buffer.begin(), buffer.end(), [this](const std::shared_ptr<AVPacket>& packet)\r
+ //{\r
+ // return packet->stream_index != default_stream_index_;\r
+ //});\r
+\r
+ //BOOST_FOREACH(auto packet, buffer)\r
+ //{\r
+ // Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
+ // Concurrency::wait(0);\r
+ //}\r
}\r
}\r
catch(...)\r
} \r
\r
BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::send(target_, eof_packet(stream->index)); \r
- \r
+ {\r
+ Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_))); \r
+ Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_))); \r
+ }\r
+\r
done();\r
}\r
\r
- bool read_next_packet()\r
+ std::shared_ptr<AVPacket> read_next_packet()\r
{ \r
auto packet = create_packet();\r
\r
else\r
{\r
CASPAR_LOG(trace) << print() << " Stopping.";\r
- return false;\r
+ return nullptr;\r
}\r
}\r
- else\r
- { \r
- THROW_ON_ERROR(ret, print(), "av_read_frame");\r
-\r
- if(packet->stream_index == default_stream_index_)\r
- {\r
- if(nb_loops_ == 0)\r
- ++nb_frames_;\r
- ++frame_number_;\r
- }\r
\r
- THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
- \r
- // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
- auto size = packet->size;\r
- auto data = packet->data; \r
+ THROW_ON_ERROR(ret, print(), "av_read_frame");\r
\r
- packet = std::shared_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
- {\r
- packet->size = size;\r
- packet->data = data;\r
- --packets_count_;\r
- graph_->update_value("buffer-count", (static_cast<double>(packets_count_)+0.001)/MAX_BUFFER_COUNT); \r
- });\r
+ if(packet->stream_index == default_stream_index_)\r
+ {\r
+ if(nb_loops_ == 0)\r
+ ++nb_frames_;\r
+ ++frame_number_;\r
+ }\r
\r
- ++packets_count_;\r
+ THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
+ \r
+ // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+ auto size = packet->size;\r
+ auto data = packet->data; \r
\r
- Concurrency::asend(target_, packet);\r
- \r
- graph_->update_value("buffer-count", (static_cast<double>(packets_count_)+0.001)/MAX_BUFFER_COUNT);\r
- } \r
+ packet = std::shared_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+ {\r
+ packet->size = size;\r
+ packet->data = data;\r
+ --packets_count_;\r
+ });\r
\r
- return true;\r
+ ++packets_count_;\r
+ \r
+ return packet;\r
}\r
\r
void seek_frame(int64_t frame, int flags = 0)\r
\r
THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
auto packet = create_packet();\r
- packet->size = 0; \r
+ packet->size = 0;\r
\r
BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::send(target_, loop_packet(stream->index)); \r
+ Concurrency::send(target_, make_message(loop_packet(stream->index), std::make_shared<token>(semaphore_))); \r
\r
graph_->add_tag("seek"); \r
} \r
\r
void input::stop()\r
{\r
- impl_->stop_ = true;\r
+ impl_->stop();\r
}\r
\r
}}
\ No newline at end of file
*/\r
#pragma once\r
\r
+#include "util.h"\r
+\r
#include <common/memory/safe_ptr.h>\r
\r
#include <agents.h>\r
{\r
public:\r
\r
- typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
+ typedef Concurrency::ITarget<packet_message_t> target_t;\r
\r
explicit input(target_t& target, \r
const safe_ptr<diagnostics::graph>& graph, \r
const std::shared_ptr<AVPacket>& loop_packet(int index)\r
{\r
static Concurrency::critical_section mutex;\r
- static std::map<int, std::shared_ptr<AVPacket>> packets;\r
-\r
Concurrency::critical_section::scoped_lock lock(mutex);\r
\r
+ static std::map<int, std::shared_ptr<AVPacket>> packets;\r
+ \r
auto& packet = packets[index];\r
if(!packet)\r
{\r
const std::shared_ptr<AVPacket>& eof_packet(int index)\r
{\r
static Concurrency::critical_section mutex;\r
- static std::map<int, std::shared_ptr<AVPacket>> packets;\r
-\r
Concurrency::critical_section::scoped_lock lock(mutex);\r
\r
+ static std::map<int, std::shared_ptr<AVPacket>> packets;\r
+ \r
auto& packet = packets[index];\r
if(!packet)\r
{\r
#endif\r
\r
#include <agents.h>\r
+#include <semaphore.h>\r
\r
struct AVFrame;\r
struct AVFormatContext;\r
\r
namespace ffmpeg {\r
\r
+class token\r
+{\r
+ safe_ptr<Concurrency::semaphore> semaphore_;\r
+public:\r
+ token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
+ : semaphore_(semaphore)\r
+ {\r
+ semaphore_->acquire();\r
+ }\r
+\r
+ ~token()\r
+ {\r
+ semaphore_->release();\r
+ }\r
+};\r
+\r
+template <typename T>\r
+struct message\r
+{\r
+ message(const std::shared_ptr<T>& payload, const std::shared_ptr<token>& token = nullptr)\r
+ : payload(payload)\r
+ , token(token)\r
+ {\r
+ }\r
+\r
+ std::shared_ptr<T> payload;\r
+ std::shared_ptr<token> token;\r
+};\r
+\r
+template<typename T>\r
+std::shared_ptr<message<T>> make_message(const std::shared_ptr<T>& payload, const std::shared_ptr<token>& token = nullptr)\r
+{\r
+ return std::make_shared<message<T>>(payload, token);\r
+}\r
+\r
+typedef std::shared_ptr<message<AVPacket>> packet_message_t;\r
+typedef std::shared_ptr<message<AVFrame>> video_message_t;\r
+typedef std::shared_ptr<message<core::audio_buffer>> audio_message_t;\r
+typedef std::shared_ptr<message<core::basic_frame>> frame_message_t;\r
+ \r
static const PixelFormat CASPAR_PIX_FMT_LUMA = PIX_FMT_MONOBLACK; // Just hijack some unual pixel format.\r
\r
core::field_mode::type get_mode(AVFrame& frame);\r
size_t height_;\r
bool is_progressive_;\r
\r
- Concurrency::transformer<std::shared_ptr<AVPacket>, std::shared_ptr<AVFrame>> transformer_;\r
+ Concurrency::transformer<packet_message_t, video_message_t> transformer_;\r
\r
- Concurrency::semaphore semaphore_;\r
+ safe_ptr<Concurrency::semaphore> semaphore_;\r
\r
public:\r
explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
, width_(codec_context_->width)\r
, height_(codec_context_->height)\r
, is_progressive_(true)\r
- , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const std::shared_ptr<AVPacket>& packet)\r
+ , transformer_(std::bind(&implementation::decode, this, std::placeholders::_1), &target, [this](const packet_message_t& message)\r
{\r
- return packet && packet->stream_index == index_;\r
+ return message->payload && message->payload->stream_index == index_;\r
})\r
- , semaphore_(1)\r
+ , semaphore_(make_safe<Concurrency::semaphore>(1))\r
{ \r
CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
\r
Concurrency::connect(source, transformer_);\r
}\r
\r
- std::shared_ptr<AVFrame> decode(const std::shared_ptr<AVPacket>& packet)\r
+ video_message_t decode(const packet_message_t& message)\r
{\r
+ auto packet = message->payload;\r
+\r
if(!packet)\r
- return nullptr;\r
+ return make_message(std::shared_ptr<AVFrame>());\r
\r
if(packet == loop_packet(index_))\r
- return loop_video();\r
+ return make_message(loop_video());\r
\r
if(packet == eof_packet(index_))\r
- return eof_video();\r
+ return make_message(eof_video());\r
\r
- std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this](AVFrame* frame)\r
+ token token(semaphore_);\r
+ std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
{\r
av_free(frame);\r
- semaphore_.release();\r
});\r
- semaphore_.acquire();\r
\r
int frame_finished = 0;\r
THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
// AVParser or demuxer which puted more then one frame in a AVPacket.\r
\r
if(frame_finished == 0) \r
- return nullptr;\r
+ return make_message(std::shared_ptr<AVFrame>());\r
\r
if(decoded_frame->repeat_pict > 0)\r
CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
\r
is_progressive_ = decoded_frame->interlaced_frame == 0;\r
\r
- return decoded_frame;\r
+ Concurrency::wait(10);\r
+ return make_message(decoded_frame, message->token);\r
}\r
\r
double fps() const\r
*/\r
#pragma once\r
\r
+#include "../util.h"\r
+\r
#include <common/memory/safe_ptr.h>\r
\r
#include <core/video_format.h>\r
{\r
public:\r
\r
- typedef Concurrency::ISource<std::shared_ptr<AVPacket>> source_t;\r
- typedef Concurrency::ITarget<std::shared_ptr<AVFrame>> target_t;\r
+ typedef Concurrency::ISource<packet_message_t> source_t;\r
+ typedef Concurrency::ITarget<video_message_t> target_t;\r
\r
explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context); \r
\r
\r
public:\r
flash_renderer(const safe_ptr<diagnostics::graph>& graph, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height) \r
- : graph_(graph)\r
- , filename_(filename)\r
+ : filename_(filename)\r
, frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
+ , graph_(graph)\r
, ax_(nullptr)\r
, head_(core::basic_frame::empty())\r
, bmp_(width, height)\r
CASPAR_LOG(info) << print() << L" Thread ended.";\r
}\r
\r
- void make_write_frame(const std::shared_ptr<bitmap>& bmp)\r
- {\r
-\r
- }\r
-\r
void param(const std::wstring& param)\r
{ \r
if(!ax_->FlashCall(param))\r
\r
safe_ptr<core::basic_frame> render_frame(bool has_underflow)\r
{\r
- float frame_time = 1.0f/ax_->GetFPS();\r
+ const float frame_time = 1.0f/ax_->GetFPS();\r
\r
graph_->update_value("tick-time", static_cast<float>(tick_timer_.elapsed()/frame_time)*0.5f);\r
tick_timer_.restart();\r
~co_init() {CoUninitialize();}\r
} init;\r
\r
- flash_renderer renderer(safe_ptr<diagnostics::graph>(graph_), frame_factory_, filename_, width_, height_);\r
+ flash_renderer renderer(graph_, frame_factory_, filename_, width_, height_);\r
\r
is_running_ = true;\r
while(is_running_)\r
</producers>\r
<channels>\r
<channel>\r
- <video-mode>1080p5000</video-mode>\r
+ <video-mode>720p5000</video-mode>\r
<consumers>\r
<decklink>\r
<device>1</device>\r