#include <tbb/atomic.h>\r
\r
#include <boost/noncopyable.hpp>\r
-#include <boost/any.hpp>\r
+\r
+#include <vector>\r
\r
namespace caspar {\r
\r
// };\r
//}\r
\r
-typedef safe_ptr<int> ticket_t;\r
+#undef Yield\r
+\r
+typedef std::vector<safe_ptr<int>> ticket_t;\r
\r
class governor : boost::noncopyable\r
{\r
tbb::atomic<int> count_;\r
Concurrency::concurrent_queue<Concurrency::Context*> waiting_contexts_;\r
\r
- void acquire_ticket(Concurrency::Context* context)\r
+ void acquire_ticket()\r
{\r
+ if(count_ < 1)\r
+ Concurrency::Context::Yield();\r
+\r
if (--count_ < 0)\r
{\r
+ auto context = Concurrency::Context::CurrentContext();\r
waiting_contexts_.push(context);\r
context->Block();\r
}\r
{\r
Concurrency:: Context* waiting = NULL;\r
while(!waiting_contexts_.try_pop(waiting))\r
- Concurrency::wait(0);\r
+ Concurrency::Context::Yield();\r
waiting->Unblock();\r
}\r
}\r
\r
ticket_t acquire()\r
{\r
- acquire_ticket(Concurrency::Context::CurrentContext());\r
+ acquire_ticket();\r
\r
- return safe_ptr<int>(new int, [this](int* p)\r
+ ticket_t ticket;\r
+ ticket.push_back(safe_ptr<int>(new int, [this](int* p)\r
{\r
delete p;\r
release_ticket();\r
- });\r
+ }));\r
+ return ticket;\r
}\r
\r
void cancel()\r
{\r
- while(count_ <= 0)\r
+ while(count_ < 0)\r
release_ticket();\r
}\r
};\r
\r
void execute(const output::source_element_t& element)\r
{ \r
- auto frame = element->first;\r
+ auto frame = element.first;\r
\r
{\r
critical_section::scoped_lock lock(mutex_); \r
class output : boost::noncopyable\r
{\r
public:\r
- typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>> source_element_t;\r
- typedef Concurrency::ISource<source_element_t> source_t;\r
+ typedef std::pair<safe_ptr<core::read_frame>, ticket_t> source_element_t;\r
+ typedef Concurrency::ISource<source_element_t> source_t;\r
\r
explicit output(source_t& source, const video_format_desc& format_desc);\r
\r
\r
mixer::target_element_t mix(const mixer::source_element_t& element)\r
{ \r
- auto frames = element->first;\r
+ auto frames = element.first;\r
\r
auto frame = make_safe<read_frame>();\r
\r
Concurrency::wait(20);\r
}\r
\r
- return mixer::target_element_t(std::make_pair(std::move(frame), element->second)); \r
+ return mixer::target_element_t(std::move(frame), element.second); \r
}\r
\r
boost::unique_future<safe_ptr<core::write_frame>> async_create_frame(const void* tag, const core::pixel_format_desc& desc)\r
{\r
public: \r
\r
- typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> source_element_t;\r
- typedef safe_ptr<std::pair<safe_ptr<core::read_frame>, ticket_t>> target_element_t;\r
+ typedef std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t> source_element_t;\r
+ typedef std::pair<safe_ptr<core::read_frame>, ticket_t> target_element_t;\r
\r
typedef Concurrency::ISource<source_element_t> source_t;\r
typedef Concurrency::ITarget<target_element_t> target_t;\r
{\r
public:\r
\r
- typedef safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>> target_element_t;\r
- typedef Concurrency::ITarget<target_element_t> target_t;\r
+ typedef std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t> target_element_t;\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
explicit stage(target_t& target, governor& governor);\r
\r
\r
struct video_channel::implementation : boost::noncopyable\r
{\r
- unbounded_buffer<safe_ptr<std::pair<std::map<int, safe_ptr<basic_frame>>, ticket_t>>> stage_frames_;\r
- unbounded_buffer<safe_ptr<std::pair<safe_ptr<read_frame>, ticket_t>>> mixer_frames_;\r
+ unbounded_buffer<stage::target_element_t> stage_frames_;\r
+ unbounded_buffer<mixer::target_element_t> mixer_frames_;\r
\r
const video_format_desc format_desc_;\r
\r
\r
class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
{ \r
- Concurrency::bounded_buffer<safe_ptr<AVFrame>> video_frames_;\r
- Concurrency::bounded_buffer<safe_ptr<core::audio_buffer>> audio_buffers_;\r
- Concurrency::bounded_buffer<safe_ptr<core::basic_frame>> muxed_frames_;\r
+ Concurrency::bounded_buffer<ffmpeg::frame_muxer2::video_source_element_t> video_frames_;\r
+ Concurrency::bounded_buffer<ffmpeg::frame_muxer2::audio_source_element_t> audio_buffers_;\r
+ Concurrency::bounded_buffer<ffmpeg::frame_muxer2::target_element_t> muxed_frames_;\r
\r
const core::video_format_desc format_desc_;\r
const size_t device_index_;\r
\r
try\r
{\r
- last_frame_ = frame = Concurrency::receive(muxed_frames_);\r
+ auto frame_element = Concurrency::receive(muxed_frames_);\r
+ last_frame_ = frame = frame_element.first;\r
}\r
catch(Concurrency::operation_timed_out&)\r
{ \r
Concurrency::parallel_invoke(\r
[&]\r
{\r
- Concurrency::send(video_frames_, av_frame); \r
+ Concurrency::send(video_frames_, ffmpeg::frame_muxer2::video_source_element_t(av_frame, ticket_t())); \r
},\r
[&]\r
{ \r
{\r
auto sample_frame_count = audio->GetSampleFrameCount();\r
auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
- Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
+ Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), ticket_t()));\r
}\r
else\r
- Concurrency::send(audio_buffers_, ffmpeg::empty_audio()); \r
+ Concurrency::send(audio_buffers_, ffmpeg::frame_muxer2::audio_source_element_t(ffmpeg::empty_audio(), ticket_t())); \r
});\r
}\r
\r
#pragma warning (pop)\r
#endif\r
\r
+#undef Yield\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- overwrite_buffer<bool> is_running_;\r
- unbounded_buffer<safe_ptr<AVPacket>> source_;\r
- ITarget<safe_ptr<core::audio_buffer>>& target_;\r
+ unbounded_buffer<audio_decoder::source_element_t> source_;\r
+ ITarget<audio_decoder::target_element_t>& target_;\r
\r
public:\r
explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
format_desc.audio_sample_rate, codec_context_->sample_rate,\r
AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
- , source_([this](const safe_ptr<AVPacket>& packet)\r
- {\r
- return packet->stream_index == index_;\r
- })\r
+ , source_([this](const audio_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
, target_(target)\r
{ \r
CASPAR_LOG(debug) << "[audio_decoder] " << context.streams[index_]->codec->codec->long_name;\r
\r
~implementation()\r
{\r
- send(is_running_, false);\r
agent::wait(this);\r
}\r
\r
{\r
try\r
{\r
- send(is_running_, true);\r
- while(is_running_.value())\r
+ while(true)\r
{ \r
- auto packet = receive(source_);\r
+ auto element = receive(source_);\r
+ auto packet = element.first;\r
\r
if(packet == loop_packet(index_))\r
{\r
- send(target_, loop_audio());\r
+ send(target_, audio_decoder::target_element_t(loop_audio(), ticket_t()));\r
continue;\r
}\r
\r
const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
\r
- send(target_, make_safe<core::audio_buffer>(samples, samples + n_samples));\r
+ send(target_, audio_decoder::target_element_t(make_safe<core::audio_buffer>(samples, samples + n_samples), element.second));\r
+ Context::Yield();\r
}\r
- Concurrency::wait(5);\r
}\r
}\r
catch(...)\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
\r
- send(is_running_, false);\r
- send(target_, eof_audio());\r
+ send(target_, audio_decoder::target_element_t(eof_audio(), ticket_t()));\r
\r
done();\r
}\r
#include <core/mixer/audio/audio_mixer.h>\r
\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
{\r
public:\r
\r
- typedef Concurrency::ISource<safe_ptr<AVPacket>>& source_t;\r
- typedef Concurrency::ITarget<safe_ptr<core::audio_buffer>>& target_t;\r
+ typedef std::pair<safe_ptr<AVPacket>, ticket_t> source_element_t;\r
+ typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t> target_element_t;\r
+\r
+ typedef Concurrency::ISource<source_element_t>& source_t;\r
+ typedef Concurrency::ITarget<target_element_t>& target_t;\r
\r
explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
\r
\r
struct ffmpeg_producer : public core::frame_producer\r
{ \r
- const std::wstring filename_;\r
- const int start_;\r
- const bool loop_;\r
- const size_t length_;\r
+ const std::wstring filename_;\r
+ const int start_;\r
+ const bool loop_;\r
+ const size_t length_;\r
\r
- call<safe_ptr<AVPacket>> throw_away_;\r
- unbounded_buffer<safe_ptr<AVPacket>> packets_;\r
- unbounded_buffer<safe_ptr<AVFrame>> video_;\r
- unbounded_buffer<safe_ptr<core::audio_buffer>> audio_;\r
- bounded_buffer<safe_ptr<core::basic_frame>> frames_;\r
+ call<input::target_element_t> throw_away_;\r
+ unbounded_buffer<input::target_element_t> packets_;\r
+ unbounded_buffer<frame_muxer2::video_source_element_t> video_;\r
+ unbounded_buffer<frame_muxer2::audio_source_element_t> audio_;\r
+ unbounded_buffer<frame_muxer2::target_element_t> frames_;\r
\r
- const safe_ptr<diagnostics::graph> graph_;\r
+ const safe_ptr<diagnostics::graph> graph_;\r
\r
- input input_; \r
- std::shared_ptr<video_decoder> video_decoder_;\r
- std::shared_ptr<audio_decoder> audio_decoder_; \r
- std::unique_ptr<frame_muxer2> muxer_;\r
+ input input_; \r
+ std::unique_ptr<frame_muxer2> muxer_;\r
+ std::shared_ptr<video_decoder> video_decoder_;\r
+ std::shared_ptr<audio_decoder> audio_decoder_; \r
\r
- safe_ptr<core::basic_frame> last_frame_;\r
+ safe_ptr<core::basic_frame> last_frame_;\r
\r
public:\r
explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
- , throw_away_([](const safe_ptr<AVPacket>&){})\r
- , frames_(2)\r
+ , throw_away_([](const input::target_element_t&){})\r
, graph_(diagnostics::create_graph("", false))\r
, input_(packets_, graph_, filename_, loop, start, length)\r
, last_frame_(core::basic_frame::empty())\r
\r
~ffmpeg_producer()\r
{\r
- input_.stop(); \r
- while(Concurrency::receive(frames_) != core::basic_frame::eof())\r
- {\r
- }\r
+ input_.stop(); \r
}\r
\r
virtual safe_ptr<core::basic_frame> receive(int hints)\r
\r
try\r
{ \r
- frame = last_frame_ = Concurrency::receive(frames_, 10);\r
+ auto frame_element = Concurrency::receive(frames_, 10);\r
+ frame = last_frame_ = frame_element.first;\r
graph_->update_text(narrow(print()));\r
}\r
catch(operation_timed_out&)\r
#include <common/exception/exceptions.h>\r
#include <common/log/log.h>\r
\r
+#include <boost/range/algorithm_ext/push_back.hpp>\r
+\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (disable : 4244)\r
\r
struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
{ \r
- ITarget<safe_ptr<core::basic_frame>>& target_;\r
+ typedef std::pair<std::shared_ptr<core::write_frame>, ticket_t> write_element_t;\r
+ typedef std::pair<std::shared_ptr<core::audio_buffer>, ticket_t> audio_element_t;\r
+\r
+ frame_muxer2::video_source_t* video_source_;\r
+ frame_muxer2::audio_source_t* audio_source_;\r
+\r
+ ITarget<frame_muxer2::target_element_t>& target_;\r
mutable single_assignment<display_mode::type> display_mode_;\r
const double in_fps_;\r
const core::video_format_desc format_desc_;\r
\r
mutable single_assignment<safe_ptr<filter>> filter_;\r
const safe_ptr<core::frame_factory> frame_factory_;\r
- \r
- call<safe_ptr<AVFrame>> push_video_;\r
- call<safe_ptr<core::audio_buffer>> push_audio_;\r
- \r
- unbounded_buffer<safe_ptr<AVFrame>> video_;\r
- unbounded_buffer<safe_ptr<core::audio_buffer>> audio_;\r
- \r
+ \r
core::audio_buffer audio_data_;\r
-\r
- overwrite_buffer<bool> is_running_;\r
+ \r
+ std::queue<write_element_t> video_frames_;\r
+ std::queue<audio_element_t> audio_buffers_;\r
\r
std::wstring filter_str_;\r
\r
double in_fps, \r
const safe_ptr<core::frame_factory>& frame_factory,\r
const std::wstring& filter)\r
- : target_(target)\r
+ : video_source_(video_source)\r
+ , audio_source_(audio_source)\r
+ , target_(target)\r
, in_fps_(in_fps)\r
, format_desc_(frame_factory->get_video_format_desc())\r
, auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
, frame_factory_(frame_factory)\r
- , push_video_(std::bind(&implementation::push_video, this, std::placeholders::_1))\r
- , push_audio_(std::bind(&implementation::push_audio, this, std::placeholders::_1))\r
- {\r
- if(video_source)\r
- video_source->link_target(&push_video_);\r
- if(audio_source)\r
- audio_source->link_target(&push_audio_);\r
- \r
+ { \r
start();\r
}\r
\r
~implementation()\r
{\r
- send(is_running_, false);\r
agent::wait(this);\r
- CASPAR_LOG(trace) << "[frame_muxer] Stopped.";\r
}\r
-\r
- std::shared_ptr<core::write_frame> receive_video()\r
+ \r
+ write_element_t receive_video()\r
{ \r
- auto video = receive(video_);\r
-\r
- if(!is_running_.value() || video == eof_video())\r
- { \r
- send(is_running_ , false);\r
- return nullptr;\r
+ if(!video_frames_.empty())\r
+ {\r
+ auto video_frame = video_frames_.front();\r
+ video_frames_.pop();\r
+ return video_frame;\r
}\r
\r
- CASPAR_ASSERT(video != loop_video());\r
+ auto element = receive(video_source_);\r
+ auto video = element.first;\r
\r
- try\r
+ if(video == eof_video())\r
+ return write_element_t(nullptr, ticket_t()); \r
+ else if(video == empty_video())\r
+ return write_element_t(make_safe<core::write_frame>(this), ticket_t());\r
+ else if(video != loop_video())\r
{\r
- if(video == empty_video())\r
- return make_safe<core::write_frame>(this);\r
+ if(!display_mode_.has_value())\r
+ initialize_display_mode(*video);\r
\r
- return make_write_frame(this, video, frame_factory_, 0);\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- send(is_running_, false);\r
- return nullptr;\r
- }\r
- }\r
+ auto format = video->format;\r
+ if(video->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
+ video->format = PIX_FMT_GRAY8;\r
\r
- std::shared_ptr<core::audio_buffer> receive_audio()\r
- { \r
- auto audio = receive(audio_);\r
+ filter_.value()->push(video);\r
+ while(true)\r
+ {\r
+ auto frame = filter_.value()->poll();\r
+ if(!frame)\r
+ break; \r
\r
- if(!is_running_.value() || audio == eof_audio())\r
- {\r
- send(is_running_ , false);\r
- return nullptr;\r
+ frame->format = format;\r
+ video_frames_.push(write_element_t(make_write_frame(this, video, frame_factory_, 0), element.second));\r
+ }\r
}\r
\r
- CASPAR_ASSERT(audio != loop_audio());\r
-\r
- try\r
+ return receive_video();\r
+ }\r
+ \r
+ audio_element_t receive_audio()\r
+ { \r
+ if(!audio_buffers_.empty())\r
{\r
- if(audio == empty_audio())\r
- return make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
-\r
- return audio;\r
+ auto audio_buffer = audio_buffers_.front();\r
+ audio_buffers_.pop();\r
+ return audio_buffer;\r
}\r
- catch(...)\r
+ \r
+ auto element = receive(audio_source_);\r
+ auto audio = element.first;\r
+\r
+ if(audio == eof_audio())\r
+ return audio_element_t(nullptr, ticket_t()); \r
+ else if(audio == empty_audio()) \r
+ audio_data_.resize(audio_data_.size() + format_desc_.audio_samples_per_frame, 0);\r
+ else if(audio != loop_audio()) \r
+ audio_data_.insert(audio_data_.end(), audio->begin(), audio->end());\r
+ \r
+ while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
{\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- send(is_running_, false);\r
- return nullptr;\r
+ auto begin = audio_data_.begin(); \r
+ auto end = begin + format_desc_.audio_samples_per_frame;\r
+ auto audio = make_safe<core::audio_buffer>(begin, end);\r
+ audio_data_.erase(begin, end);\r
+ audio_buffers_.push(frame_muxer2::audio_source_element_t(audio, element.second));\r
}\r
+\r
+ return receive_audio();\r
}\r
- \r
+ \r
virtual void run()\r
{\r
try\r
{\r
- send(is_running_, true);\r
- while(is_running_.value())\r
+ bool eof = false;\r
+ while(!eof)\r
{\r
- auto audio = receive_audio(); \r
+ ticket_t tickets;\r
+\r
+ auto audio_element = receive_audio();\r
+ boost::range::push_back(tickets, audio_element.second);\r
+\r
+ auto audio = audio_element.first;\r
if(!audio)\r
+ {\r
+ eof = true;\r
break;\r
- \r
- auto video = receive_video();\r
+ }\r
+ \r
+ auto video_element = receive_video();\r
+ boost::range::push_back(tickets, video_element.second);\r
+\r
+ auto video = video_element.first;\r
if(!video)\r
+ {\r
+ eof = true;\r
break;\r
+ }\r
\r
video->audio_data() = std::move(*audio);\r
\r
case display_mode::deinterlace:\r
case display_mode::deinterlace_bob:\r
{\r
- send(target_, safe_ptr<core::basic_frame>(std::move(video)));\r
+ send(target_, frame_muxer2::target_element_t(video, tickets));\r
\r
break;\r
}\r
case display_mode::duplicate: \r
{ \r
- send(target_, safe_ptr<core::basic_frame>(video));\r
+ send(target_, frame_muxer2::target_element_t(video, tickets));\r
\r
- auto audio2 = receive_audio();\r
+ auto audio_element2 = receive_audio();\r
+ boost::range::push_back(tickets, audio_element.second);\r
+\r
+ auto audio2 = audio_element2.first;\r
if(audio2)\r
{\r
auto video2 = make_safe<core::write_frame>(*video);\r
video2->audio_data() = std::move(*audio2);\r
- send(target_, safe_ptr<core::basic_frame>(video2));\r
+ send(target_, frame_muxer2::target_element_t(video2, tickets));\r
}\r
+ else\r
+ eof = true;\r
\r
break;\r
}\r
case display_mode::half: \r
{ \r
- send(target_, safe_ptr<core::basic_frame>(std::move(video)));\r
- receive_video();\r
+ send(target_, frame_muxer2::target_element_t(video, tickets));\r
+\r
+ if(!receive_video().first)\r
+ eof = true;\r
+\r
break;\r
}\r
case display_mode::deinterlace_bob_reinterlace:\r
case display_mode::interlace: \r
{ \r
auto frame = safe_ptr<core::basic_frame>(std::move(video));\r
- auto video2 = receive_video();\r
+\r
+ auto video_element2 = receive_video();\r
+ boost::range::push_back(tickets, video_element.second);\r
+\r
+ auto video2 = video_element2.first;\r
auto frame2 = core::basic_frame::empty();\r
\r
if(video2) \r
- frame2 = safe_ptr<core::basic_frame>(video2); \r
+ frame2 = safe_ptr<core::basic_frame>(video2); \r
+ else\r
+ eof = true;\r
\r
frame = core::basic_frame::interlace(std::move(frame), std::move(frame2), format_desc_.field_mode); \r
- send(target_, frame);\r
+ send(target_, frame_muxer2::target_element_t(frame, tickets));\r
\r
break;\r
}\r
default: \r
BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
}\r
- }\r
+ } \r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
\r
- send(is_running_ , false);\r
- send(target_, core::basic_frame::eof());\r
+ send(target_, frame_muxer2::target_element_t(core::basic_frame::eof(), ticket_t()));\r
\r
done();\r
}\r
- \r
- void push_video(const safe_ptr<AVFrame>& video_frame)\r
- { \r
- if(video_frame == eof_video() || video_frame == empty_video())\r
- {\r
- send(video_, video_frame);\r
- return;\r
- }\r
- \r
- if(video_frame == loop_video()) \r
- return; \r
- \r
- try\r
- {\r
- if(!is_running_.value())\r
- return;\r
-\r
- if(!display_mode_.has_value())\r
- initialize_display_mode(*video_frame);\r
- \r
- //send(video_, make_safe_ptr(video_frame));\r
-\r
- //if(hints & core::frame_producer::ALPHA_HINT)\r
- // video_frame->format = make_alpha_format(video_frame->format);\r
- \r
- auto format = video_frame->format;\r
- if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
- video_frame->format = PIX_FMT_GRAY8;\r
-\r
- filter_.value()->push(video_frame);\r
-\r
- while(true)\r
- {\r
- auto frame = filter_.value()->poll();\r
- if(!frame)\r
- break; \r
-\r
- frame->format = format;\r
- send(video_, make_safe_ptr(frame));\r
- }\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- send(is_running_ , false);\r
- send(video_, eof_video());\r
- }\r
- }\r
-\r
- void push_audio(const safe_ptr<core::audio_buffer>& audio_samples)\r
- {\r
- if(audio_samples == eof_audio() || audio_samples == empty_audio())\r
- {\r
- send(audio_, audio_samples);\r
- return;\r
- }\r
-\r
- if(audio_samples == loop_audio()) \r
- return; \r
-\r
- try\r
- { \r
- if(!is_running_.value())\r
- return;\r
-\r
- audio_data_.insert(audio_data_.end(), audio_samples->begin(), audio_samples->end());\r
- \r
- while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
- {\r
- auto begin = audio_data_.begin(); \r
- auto end = begin + format_desc_.audio_samples_per_frame;\r
- \r
- send(audio_, make_safe<core::audio_buffer>(begin, end));\r
-\r
- audio_data_.erase(begin, end);\r
- }\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- send(is_running_ , false);\r
- send(audio_, eof_audio());\r
- }\r
- }\r
\r
void initialize_display_mode(AVFrame& frame)\r
{\r
#include "util.h"\r
\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <core/mixer/audio/audio_mixer.h>\r
\r
{\r
public:\r
\r
- typedef Concurrency::ISource<safe_ptr<AVFrame>> video_source_t;\r
- typedef Concurrency::ISource<safe_ptr<core::audio_buffer>> audio_source_t;\r
- typedef Concurrency::ITarget<safe_ptr<core::basic_frame>> target_t;\r
+ typedef std::pair<safe_ptr<AVFrame>, ticket_t> video_source_element_t;\r
+ typedef std::pair<safe_ptr<core::audio_buffer>, ticket_t> audio_source_element_t;\r
+ typedef std::pair<safe_ptr<core::basic_frame>, ticket_t> target_element_t;\r
+\r
+ typedef Concurrency::ISource<video_source_element_t> video_source_t;\r
+ typedef Concurrency::ISource<audio_source_element_t> audio_source_t;\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
frame_muxer2(video_source_t* video_source,\r
audio_source_t* audio_source, \r
#pragma warning (pop)\r
#endif\r
\r
+\r
+#undef Yield\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
tbb::atomic<size_t> packets_count_;\r
tbb::atomic<size_t> packets_size_;\r
\r
- bool stop_;\r
+ overwrite_buffer<bool> is_running_;\r
+ governor governor_;\r
\r
public:\r
explicit implementation(input::target_t& target,\r
, start_(start)\r
, length_(length)\r
, frame_number_(0)\r
- , stop_(false)\r
+ , governor_(4)\r
{ \r
packets_count_ = 0;\r
packets_size_ = 0;\r
nb_frames_ = 0;\r
nb_loops_ = 0;\r
\r
- av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
+ //av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
\r
if(start_ > 0) \r
seek_frame(start_);\r
\r
graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
}\r
+\r
+ ~implementation()\r
+ {\r
+ if(is_running_.value())\r
+ stop();\r
+ }\r
\r
void stop()\r
{\r
- stop_ = true;\r
+ send(is_running_, false);\r
+ governor_.cancel();\r
agent::wait(this);\r
}\r
\r
{\r
try\r
{\r
- while(!stop_)\r
+ send(is_running_, true);\r
+ while(is_running_.value())\r
{\r
auto packet = read_next_packet();\r
if(!packet)\r
break;\r
+ \r
+ if(packet->stream_index != default_stream_index_)\r
+ Concurrency::asend(target_, input::target_element_t(packet, governor_.acquire()));\r
+ else\r
+ Concurrency::asend(target_, input::target_element_t(packet, ticket_t()));\r
\r
- Concurrency::asend(target_, make_safe_ptr(packet));\r
- Concurrency::wait(20);\r
+ Context::Yield();\r
}\r
}\r
catch(...)\r
} \r
\r
BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::send(target_, eof_packet(stream->index)); \r
+ Concurrency::send(target_, input::target_element_t(eof_packet(stream->index), ticket_t())); \r
\r
done();\r
}\r
packet->size = 0;\r
\r
BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::asend(target_, loop_packet(stream->index)); \r
+ Concurrency::asend(target_, input::target_element_t(loop_packet(stream->index), ticket_t())); \r
\r
graph_->add_tag("seek"); \r
} \r
#include "util.h"\r
\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <agents.h>\r
#include <concrt.h>\r
{\r
public:\r
\r
- typedef Concurrency::ITarget<safe_ptr<AVPacket>> target_t;\r
+ typedef std::pair<safe_ptr<AVPacket>, ticket_t> target_element_t;\r
+\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
explicit input(target_t& target, \r
const safe_ptr<diagnostics::graph>& graph, \r
\r
#include <tbb/scalable_allocator.h>\r
\r
+#undef Yield\r
using namespace Concurrency;\r
\r
namespace caspar { namespace ffmpeg {\r
size_t height_;\r
bool is_progressive_;\r
\r
- overwrite_buffer<bool> is_running_;\r
- unbounded_buffer<safe_ptr<AVPacket>> source_;\r
- ITarget<safe_ptr<AVFrame>>& target_;\r
+ unbounded_buffer<video_decoder::source_element_t> source_;\r
+ ITarget<video_decoder::target_element_t>& target_;\r
\r
public:\r
explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
, width_(codec_context_->width)\r
, height_(codec_context_->height)\r
, is_progressive_(true)\r
- , source_([this](const safe_ptr<AVPacket>& packet)\r
- {\r
- return packet->stream_index == index_;\r
- })\r
+ , source_([this](const video_decoder::source_element_t& element){return element.first->stream_index == index_;})\r
, target_(target)\r
{ \r
CASPAR_LOG(debug) << "[video_decoder] " << context.streams[index_]->codec->codec->long_name;\r
\r
~implementation()\r
{\r
- send(is_running_, false);\r
agent::wait(this);\r
}\r
\r
{\r
try\r
{\r
- send(is_running_, true);\r
- while(is_running_.value())\r
+ while(true)\r
{\r
- auto packet = receive(source_);\r
+ auto element = receive(source_);\r
+ auto packet = element.first;\r
\r
if(packet == loop_packet(index_))\r
{\r
- send(target_, loop_video());\r
+ send(target_, target_element_t(loop_video(), ticket_t()));\r
continue;\r
}\r
\r
\r
// C-TODO: Avoid duplication.\r
// Need to dupliace frame data since avcodec_decode_video2 reuses it.\r
- send(target_, dup_frame(make_safe_ptr(decoded_frame)));\r
- Concurrency::wait(10);\r
+ send(target_, target_element_t(dup_frame(make_safe_ptr(decoded_frame)), element.second)); \r
+ Context::Yield();\r
}\r
}\r
catch(...)\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
\r
- send(is_running_, false),\r
- send(target_, eof_video());\r
+ send(target_, target_element_t(eof_video(), ticket_t()));\r
\r
done();\r
}\r
#include "../util.h"\r
\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/governor.h>\r
\r
#include <core/video_format.h>\r
\r
{\r
public:\r
\r
- typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
- typedef Concurrency::ITarget<safe_ptr<AVFrame>> target_t;\r
+ typedef std::pair<safe_ptr<AVPacket>, ticket_t> source_element_t;\r
+ typedef std::pair<safe_ptr<AVFrame>, ticket_t> target_element_t;\r
+\r
+ typedef Concurrency::ISource<source_element_t> source_t;\r
+ typedef Concurrency::ITarget<target_element_t> target_t;\r
\r
explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context); \r
\r