From 58439eaecd6ed455b187e9965cf333073ead46de Mon Sep 17 00:00:00 2001 From: ronag Date: Mon, 17 Oct 2011 23:40:55 +0000 Subject: [PATCH] 2.0.1: ffmpeg: Replaced TBB implementation with better Concurrency Runtime based implementation. git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0.1@1360 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d --- common/common.vcxproj | 2 + common/common.vcxproj.filters | 9 + common/concrt/bounded_buffer.h | 584 ++++++++++++++++++ common/concrt/scoped_oversubscription_token.h | 24 + common/diagnostics/graph.cpp | 24 +- common/diagnostics/graph.h | 13 +- core/mixer/image/image_mixer.cpp | 6 + core/mixer/image/image_mixer.h | 2 +- core/mixer/mixer.cpp | 14 +- core/mixer/mixer.h | 6 +- core/producer/frame/frame_factory.h | 51 +- modules/decklink/interop/DeckLinkAPI_h.h | 2 +- modules/decklink/interop/DeckLinkAPI_i.c | 2 +- modules/ffmpeg/StdAfx.h | 5 +- .../ffmpeg/producer/audio/audio_decoder.cpp | 120 ++-- modules/ffmpeg/producer/audio/audio_decoder.h | 17 +- .../ffmpeg/producer/audio/audio_resampler.cpp | 2 +- .../ffmpeg/producer/audio/audio_resampler.h | 2 + modules/ffmpeg/producer/ffmpeg_producer.cpp | 178 ++---- .../ffmpeg/producer/filter/parallel_yadif.cpp | 11 +- modules/ffmpeg/producer/frame_muxer.cpp | 364 +++++++++++ modules/ffmpeg/producer/frame_muxer.h | 23 + modules/ffmpeg/producer/input.cpp | 189 +++--- modules/ffmpeg/producer/input.h | 22 +- modules/ffmpeg/producer/util.cpp | 74 ++- modules/ffmpeg/producer/util.h | 23 +- .../ffmpeg/producer/video/video_decoder.cpp | 179 +++--- modules/ffmpeg/producer/video/video_decoder.h | 21 +- modules/ffmpeg/tbb_avcodec.cpp | 25 +- shell/casparcg.config | 7 +- 30 files changed, 1556 insertions(+), 445 deletions(-) create mode 100644 common/concrt/bounded_buffer.h create mode 100644 common/concrt/scoped_oversubscription_token.h diff --git a/common/common.vcxproj b/common/common.vcxproj index 3a7700691..3157645d4 100644 --- a/common/common.vcxproj +++ b/common/common.vcxproj @@ -209,6 +209,8 @@ + + diff --git a/common/common.vcxproj.filters b/common/common.vcxproj.filters index 54cce3a53..8ebdeab3c 100644 --- a/common/common.vcxproj.filters +++ b/common/common.vcxproj.filters @@ -37,6 +37,9 @@ {28c25c8a-1277-4d2c-9e85-5af33f9938ea} + + {d3ba93c7-82fb-4e69-97bc-cdbf7d6feb24} + @@ -127,5 +130,11 @@ source\utility + + source\concrt + + + source\concrt + \ No newline at end of file diff --git a/common/concrt/bounded_buffer.h b/common/concrt/bounded_buffer.h new file mode 100644 index 000000000..c167e33e2 --- /dev/null +++ b/common/concrt/bounded_buffer.h @@ -0,0 +1,584 @@ +#pragma once + +#include + +namespace Concurrency +{ + +/// +/// A bounded_buffer implementation. Once the capacity is reached it will save the offered message +/// id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume +/// any of the postponed messages. Preference is given to previously offered messages before new ones. +/// +/// NOTE: this bounded_buffer implementation contains code that is very unique to this particular block. +/// Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer +/// implementation uses a critical_section, several interlocked operations, and additional calls to async_send. +/// These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem. +/// +/// +/// The payload type of messages stored and propagated by the buffer. +/// +template +class bounded_buffer : public propagator_block>, multi_link_registry>> +{ +public: + /// + /// Creates an bounded_buffer within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + bounded_buffer(const size_t capacity) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(); + } + + /// + /// Creates an bounded_buffer within the default scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a filter function. + /// + bounded_buffer(const size_t capacity, filter_method const& _Filter) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(); + register_filter(_Filter); + } + + /// + /// Creates an bounded_buffer within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + bounded_buffer(const size_t capacity, Scheduler& _PScheduler) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(&_PScheduler); + } + + /// + /// Creates an bounded_buffer within the specified scheduler, and places it any schedule + /// group of the scheduler’s choosing. + /// + /// + /// A reference to a scheduler instance. + /// + /// + /// A reference to a filter function. + /// + bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(&_PScheduler); + register_filter(_Filter); + } + + /// + /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + } + + /// + /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied + /// by the schedule group. + /// + /// + /// A reference to a schedule group. + /// + /// + /// A reference to a filter function. + /// + bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) + : _M_capacity(capacity), _M_currentSize(0) + { + initialize_source_and_target(NULL, &_PScheduleGroup); + register_filter(_Filter); + } + + /// + /// Cleans up any resources that may have been used by the bounded_buffer. + /// + ~bounded_buffer() + { + // Remove all links + remove_network_links(); + } + + /// + /// Add an item to the bounded_buffer. + /// + /// + /// A reference to the item to add. + /// + /// + /// A boolean indicating whether the data was accepted. + /// + bool enqueue(_Type const& _Item) + { + return Concurrency::send<_Type>(this, _Item); + } + + /// + /// Remove an item from the bounded_buffer. + /// + /// + /// The message payload. + /// + _Type dequeue() + { + return receive<_Type>(this); + } + +protected: + + /// + /// The main propagate() function for ITarget blocks. Called by a source + /// block, generally within an asynchronous task to send messages to its targets. + /// + /// + /// A pointer to the message. + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + /// + /// It is important that calls to propagate do *not* take the same lock on the + /// internal structure that is used by Consume and the LWT. Doing so could + /// result in a deadlock with the Consume call. (in the case of the bounded_buffer, + /// this lock is the m_internalLock) + /// + virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + message_status _Result = accepted; + + // Check current capacity. + if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) + { + // Postpone the message, buffer is full. + _InterlockedDecrement(&_M_currentSize); + _Result = postponed; + + // Save off the message id from this source to later try + // and reserve/consume when more space is free. + { + critical_section::scoped_lock scopedLock(_M_savedIdsLock); + _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id(); + } + + async_send(NULL); + } + else + { + // + // Accept the message being propagated + // Note: depending on the source block propagating the message + // this may not necessarily be the same message (pMessage) first + // passed into the function. + // + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + async_send(_PMessage); + } + else + { + // Didn't get a message so need to decrement. + _InterlockedDecrement(&_M_currentSize); + _Result = missed; + async_send(NULL); + } + } + + return _Result; + } + + /// + /// Synchronously sends a message to this block. When this function completes the message will + /// already have propagated into the block. + /// + /// + /// A pointer to the message. + /// + /// + /// A pointer to the source block offering the message. + /// + /// + /// An indication of what the target decided to do with the message. + /// + virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) + { + message_status _Result = accepted; + + // Check current capacity. + if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) + { + // Postpone the message, buffer is full. + _InterlockedDecrement(&_M_currentSize); + _Result = postponed; + + // Save off the message id from this source to later try + // and reserve/consume when more space is free. + { + critical_section::scoped_lock scopedLock(_M_savedIdsLock); + _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id(); + } + + async_send(NULL); + } + else + { + // + // Accept the message being propagated + // Note: depending on the source block propagating the message + // this may not necessarily be the same message (pMessage) first + // passed into the function. + // + _PMessage = _PSource->accept(_PMessage->msg_id(), this); + + if (_PMessage != NULL) + { + async_send(_PMessage); + } + else + { + // Didn't get a message so need to decrement. + _InterlockedDecrement(&_M_currentSize); + _Result = missed; + async_send(NULL); + } + } + + return _Result; + } + + /// + /// Accepts an offered message by the source, transferring ownership to the caller. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + virtual message<_Type> * accept_message(runtime_object_identity _MsgId) + { + // + // Peek at the head message in the message buffer. If the Ids match + // dequeue and transfer ownership + // + message<_Type> * _Msg = NULL; + + if (_M_messageBuffer._Is_head(_MsgId)) + { + _Msg = _M_messageBuffer._Dequeue(); + + // Give preference to any previously postponed messages + // before decrementing current size. + if(!try_consume_msg()) + { + _InterlockedDecrement(&_M_currentSize); + } + } + + return _Msg; + } + + /// + /// Try to reserve and consume a message from list of saved message ids. + /// + /// + /// True if a message was sucessfully consumed, false otherwise. + /// + bool try_consume_msg() + { + runtime_object_identity _ReservedId = -1; + ISource<_Type> * _PSource = NULL; + + // Walk through source links seeing if any saved ids exist. + bool _ConsumedMsg = true; + while(_ConsumedMsg) + { + source_iterator _Iter = _M_connectedSources.begin(); + { + critical_section::scoped_lock scopedLock(_M_savedIdsLock); + for (; *_Iter != NULL; ++_Iter) + { + _PSource = *_Iter; + std::map *, runtime_object_identity>::iterator _MapIter; + if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end()) + { + _ReservedId = _MapIter->second; + _M_savedSourceMsgIds.erase(_MapIter); + break; + } + } + } + + // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster. + if(_ReservedId != -1) + { + if(_PSource->reserve(_ReservedId, this)) + { + message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this); + async_send(_ConsumedMsg); + return true; + } + // Reserve failed go or link was removed, + // go back and try and find a different msg id. + else + { + continue; + } + } + + // If this point is reached the map of source ids was empty. + break; + } + + return false; + } + + /// + /// Reserves a message previously offered by the source. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A Boolean indicating whether the reservation worked or not. + /// + /// + /// After 'reserve' is called, either 'consume' or 'release' must be called. + /// + virtual bool reserve_message(runtime_object_identity _MsgId) + { + // Allow reservation if this is the head message + return _M_messageBuffer._Is_head(_MsgId); + } + + /// + /// Consumes a message that was reserved previously. + /// + /// + /// The runtime object identity of the message. + /// + /// + /// A pointer to the message that the caller now has ownership of. + /// + /// + /// Similar to 'accept', but is always preceded by a call to 'reserve'. + /// + virtual message<_Type> * consume_message(runtime_object_identity _MsgId) + { + // By default, accept the message + return accept_message(_MsgId); + } + + /// + /// Releases a previous message reservation. + /// + /// + /// The runtime object identity of the message. + /// + virtual void release_message(runtime_object_identity _MsgId) + { + // The head message is the one reserved. + if (!_M_messageBuffer._Is_head(_MsgId)) + { + throw message_not_found(); + } + } + + /// + /// Resumes propagation after a reservation has been released + /// + virtual void resume_propagation() + { + // If there are any messages in the buffer, propagate them out + if (_M_messageBuffer._Count() > 0) + { + async_send(NULL); + } + } + + /// + /// Notification that a target was linked to this source. + /// + /// + /// A pointer to the newly linked target. + /// + virtual void link_target_notification(ITarget<_Type> * _PTarget) + { + // If the message queue is blocked due to reservation + // there is no need to do any message propagation + if (_M_pReservedFor != NULL) + { + return; + } + + message<_Type> * _Msg = _M_messageBuffer._Peek(); + + if (_Msg != NULL) + { + // Propagate the head message to the new target + message_status _Status = _PTarget->propagate(_Msg, this); + + if (_Status == accepted) + { + // The target accepted the message, restart propagation. + propagate_to_any_targets(NULL); + } + + // If the status is anything other than accepted, then leave + // the message queue blocked. + } + } + + /// + /// Takes the message and propagates it to all the targets of this bounded_buffer. + /// This is called from async_send. + /// + /// + /// A pointer to a new message. + /// + virtual void propagate_to_any_targets(message<_Type> * _PMessage) + { + // Enqueue pMessage to the internal message buffer if it is non-NULL. + // pMessage can be NULL if this LWT was the result of a Repropagate call + // out of a Consume or Release (where no new message is queued up, but + // everything remaining in the bounded buffer needs to be propagated out) + if (_PMessage != NULL) + { + _M_messageBuffer._Enqueue(_PMessage); + + // If the incoming pMessage is not the head message, we can safely assume that + // the head message is blocked and waiting on Consume(), Release() or a new + // link_target() and cannot be propagated out. + if (_M_messageBuffer._Is_head(_PMessage->msg_id())) + { + _Propagate_priority_order(); + } + } + else + { + // While current size is less than capacity try to consume + // any previously offered ids. + bool _ConsumedMsg = true; + while(_ConsumedMsg) + { + // Assume a message will be found to successfully consume in the + // saved ids, if not this will be decremented afterwards. + if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity) + { + break; + } + + _ConsumedMsg = try_consume_msg(); + } + + // Decrement the current size, we broke out of the previous loop + // because we reached capacity or there were no more messages to consume. + _InterlockedDecrement(&_M_currentSize); + } + } + +private: + + /// + /// Attempts to propagate out any messages currently in the block. + /// + void _Propagate_priority_order() + { + message<_Target_type> * _Msg = _M_messageBuffer._Peek(); + + // If someone has reserved the _Head message, don't propagate anymore + if (_M_pReservedFor != NULL) + { + return; + } + + while (_Msg != NULL) + { + message_status _Status = declined; + + // Always start from the first target that linked. + for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) + { + ITarget<_Target_type> * _PTarget = *_Iter; + _Status = _PTarget->propagate(_Msg, this); + + // Ownership of message changed. Do not propagate this + // message to any other target. + if (_Status == accepted) + { + break; + } + + // If the target just propagated to reserved this message, stop + // propagating it to others. + if (_M_pReservedFor != NULL) + { + break; + } + } + + // If status is anything other than accepted, then the head message + // was not propagated out. Thus, nothing after it in the queue can + // be propagated out. Cease propagation. + if (_Status != accepted) + { + break; + } + + // Get the next message + _Msg = _M_messageBuffer._Peek(); + } + } + + /// + /// Message buffer used to store messages. + /// + ::Concurrency::details::_Queue> _M_messageBuffer; + + /// + /// Maximum number of messages bounded_buffer can hold. + /// + const size_t _M_capacity; + + /// + /// Current number of messages in bounded_buffer. + /// + volatile long _M_currentSize; + + /// + /// Lock used to guard saved message ids map. + /// + critical_section _M_savedIdsLock; + + /// + /// Map of source links to saved message ids. + /// + std::map *, runtime_object_identity> _M_savedSourceMsgIds; + + // + // Hide assignment operator and copy constructor + // + bounded_buffer const &operator =(bounded_buffer const&); // no assignment operator + bounded_buffer(bounded_buffer const &); // no copy constructor +}; +} \ No newline at end of file diff --git a/common/concrt/scoped_oversubscription_token.h b/common/concrt/scoped_oversubscription_token.h new file mode 100644 index 000000000..71d6e35af --- /dev/null +++ b/common/concrt/scoped_oversubscription_token.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace Concurrency { + +/// +/// An RAII style wrapper around Concurrency::Context::Oversubscribe, +/// useful for annotating known blocking calls +/// +class scoped_oversubcription_token +{ +public: + scoped_oversubcription_token() + { + Concurrency::Context::CurrentContext()->Oversubscribe(true); + } + ~scoped_oversubcription_token() + { + Concurrency::Context::CurrentContext()->Oversubscribe(false); + } +}; + +} \ No newline at end of file diff --git a/common/diagnostics/graph.cpp b/common/diagnostics/graph.cpp index 5378f5156..213610aed 100644 --- a/common/diagnostics/graph.cpp +++ b/common/diagnostics/graph.cpp @@ -269,7 +269,7 @@ struct graph::implementation : public drawable implementation(const printer& parent_printer) : parent_printer_(parent_printer) - , name_(parent_printer_ ? narrow(parent_printer_()) : "") + , name_("") , counter_(0){} void update(const std::string& name, double value) @@ -356,14 +356,20 @@ private: implementation& operator=(implementation&); }; -graph::graph(const std::string& name) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(name) : nullptr) +graph::graph(const std::string& name, bool start) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(name) : nullptr) { - if(impl_) - context::register_drawable(impl_); + if(start) + graph::start(); } -graph::graph(const printer& parent_printer) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(parent_printer) : nullptr) +graph::graph(const printer& parent_printer, bool start) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(parent_printer) : nullptr) { + if(start) + graph::start(); +} + +void graph::start() +{ if(impl_) context::register_drawable(impl_); } @@ -424,13 +430,13 @@ void graph::add_guide(const std::string& name, double value) } } -safe_ptr create_graph(const std::string& name) +safe_ptr create_graph(const std::string& name, bool start) { - return safe_ptr(new graph(name)); + return safe_ptr(new graph(name, start)); } -safe_ptr create_graph(const printer& parent_printer) +safe_ptr create_graph(const printer& parent_printer, bool start) { - return safe_ptr(new graph(parent_printer)); + return safe_ptr(new graph(parent_printer, start)); } diff --git a/common/diagnostics/graph.h b/common/diagnostics/graph.h index 3f54c0a7c..dbba43fce 100644 --- a/common/diagnostics/graph.h +++ b/common/diagnostics/graph.h @@ -51,11 +51,12 @@ struct color class graph { - friend safe_ptr create_graph(const std::string& name); - friend safe_ptr create_graph(const printer& parent_printer); - graph(const std::string& name); - graph(const printer& parent_printer); + friend safe_ptr create_graph(const std::string& name, bool start); + friend safe_ptr create_graph(const printer& parent_printer, bool start); + graph(const std::string& name, bool start = true); + graph(const printer& parent_printer, bool start = true); public: + void start(); void update_value(const std::string& name, double value); void set_value(const std::string& name, double value); void set_color(const std::string& name, color c); @@ -66,8 +67,8 @@ private: std::shared_ptr impl_; }; -safe_ptr create_graph(const std::string& name); -safe_ptr create_graph(const printer& parent_printer); +safe_ptr create_graph(const std::string& name, bool start = true); +safe_ptr create_graph(const printer& parent_printer, bool start = true); //namespace v2 //{ diff --git a/core/mixer/image/image_mixer.cpp b/core/mixer/image/image_mixer.cpp index a93bcd8de..03280490f 100644 --- a/core/mixer/image/image_mixer.cpp +++ b/core/mixer/image/image_mixer.cpp @@ -285,6 +285,11 @@ public: { return make_safe(ogl_, tag, desc); } + + boost::unique_future> create_frame2(const void* tag, const pixel_format_desc& desc) + { + return ogl_.begin_invoke([=]{return make_safe(ogl_, tag, desc);}, high_priority); + } }; image_mixer::image_mixer(ogl_device& ogl, const video_format_desc& format_desc) : impl_(new implementation(ogl, format_desc)){} @@ -293,6 +298,7 @@ void image_mixer::visit(write_frame& frame){impl_->visit(frame);} void image_mixer::end(){impl_->end();} boost::unique_future> image_mixer::render(){return impl_->render();} safe_ptr image_mixer::create_frame(const void* tag, const pixel_format_desc& desc){return impl_->create_frame(tag, desc);} +boost::unique_future> image_mixer::create_frame2(const void* tag, const pixel_format_desc& desc){return impl_->create_frame2(tag, desc);} void image_mixer::begin_layer(blend_mode::type blend_mode){impl_->begin_layer(blend_mode);} void image_mixer::end_layer(){impl_->end_layer();} image_mixer& image_mixer::operator=(image_mixer&& other) diff --git a/core/mixer/image/image_mixer.h b/core/mixer/image/image_mixer.h index 266c3d43b..436be0249 100644 --- a/core/mixer/image/image_mixer.h +++ b/core/mixer/image/image_mixer.h @@ -26,7 +26,6 @@ #include #include - #include namespace caspar { namespace core { @@ -54,6 +53,7 @@ public: boost::unique_future> render(); safe_ptr create_frame(const void* tag, const pixel_format_desc& format); + boost::unique_future> create_frame2(const void* tag, const pixel_format_desc& format); private: struct implementation; diff --git a/core/mixer/mixer.cpp b/core/mixer/mixer.cpp index 8353d4170..1f566e77d 100644 --- a/core/mixer/mixer.cpp +++ b/core/mixer/mixer.cpp @@ -159,6 +159,11 @@ public: { return image_mixer_.create_frame(tag, desc); } + + boost::unique_future> create_frame2(const void* tag, const core::pixel_format_desc& desc) + { + return image_mixer_.create_frame2(tag, desc); + } void set_transform(int index, const frame_transform& transform, unsigned int mix_duration, const std::wstring& tween) { @@ -207,14 +212,7 @@ mixer::mixer(video_channel_context& video_channel) : impl_(new implementation(vi safe_ptr mixer::execute(const std::map>& frames){ return impl_->execute(frames);} core::video_format_desc mixer::get_video_format_desc() const { return impl_->channel_.get_format_desc(); } safe_ptr mixer::create_frame(const void* tag, const core::pixel_format_desc& desc){ return impl_->create_frame(tag, desc); } -safe_ptr mixer::create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt) -{ - // Create bgra frame - core::pixel_format_desc desc; - desc.pix_fmt = pix_fmt; - desc.planes.push_back( core::pixel_format_desc::plane(width, height, 4)); - return create_frame(tag, desc); -} +boost::unique_future> mixer::create_frame2(const void* video_stream_tag, const pixel_format_desc& desc){ return impl_->create_frame2(video_stream_tag, desc); } void mixer::set_frame_transform(int index, const core::frame_transform& transform, unsigned int mix_duration, const std::wstring& tween){impl_->set_transform(index, transform, mix_duration, tween);} void mixer::apply_frame_transform(int index, const std::function& transform, unsigned int mix_duration, const std::wstring& tween){impl_->apply_transform(index, transform, mix_duration, tween);} void mixer::clear_transforms(){impl_->clear_transforms();} diff --git a/core/mixer/mixer.h b/core/mixer/mixer.h index f7bd73efe..a2948c025 100644 --- a/core/mixer/mixer.h +++ b/core/mixer/mixer.h @@ -49,9 +49,9 @@ public: safe_ptr execute(const std::map>& frames); // nothrow - safe_ptr create_frame(const void* tag, const core::pixel_format_desc& desc); - safe_ptr create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt = core::pixel_format::bgra); - + safe_ptr create_frame(const void* tag, const core::pixel_format_desc& desc); + boost::unique_future> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc); + core::video_format_desc get_video_format_desc() const; // nothrow diff --git a/core/producer/frame/frame_factory.h b/core/producer/frame/frame_factory.h index c8880155b..efaa38b54 100644 --- a/core/producer/frame/frame_factory.h +++ b/core/producer/frame/frame_factory.h @@ -20,23 +20,70 @@ #pragma once #include "pixel_format.h" +#include "../../video_format.h" #include +#include + +#include #include +#include namespace caspar { namespace core { class write_frame; -struct pixel_format_desc; struct video_format_desc; struct frame_factory : boost::noncopyable { virtual safe_ptr create_frame(const void* video_stream_tag, const pixel_format_desc& desc) = 0; - virtual safe_ptr create_frame(const void* video_stream_tag, size_t width, size_t height, pixel_format::type pix_fmt = pixel_format::bgra) = 0; + virtual safe_ptr create_frame(const void* video_stream_tag, size_t width, size_t height, pixel_format::type pix_fmt = pixel_format::bgra) + { + // Create bgra frame + core::pixel_format_desc desc; + desc.pix_fmt = pix_fmt; + desc.planes.push_back( core::pixel_format_desc::plane(width, height, 4)); + return create_frame(video_stream_tag, desc); + } + virtual boost::unique_future> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc) = 0; + virtual video_format_desc get_video_format_desc() const = 0; // nothrow }; + +class concrt_frame_factory : public frame_factory +{ + safe_ptr factory_; +public: + concrt_frame_factory(const safe_ptr& factory) + : factory_(factory) + { + } + + virtual safe_ptr create_frame(const void* tag, const pixel_format_desc& desc) + { + auto frame = factory_->create_frame2(tag, desc); + + Concurrency::wait(0); + + if(!frame.has_value()) + { + Concurrency::scoped_oversubcription_token oversubscribe; + frame.wait(); + } + return frame.get(); + } + + virtual boost::unique_future> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc) + { + return factory_->create_frame2(video_stream_tag, desc); + } + + virtual video_format_desc get_video_format_desc() const + { + return factory_->get_video_format_desc(); + } +}; }} \ No newline at end of file diff --git a/modules/decklink/interop/DeckLinkAPI_h.h b/modules/decklink/interop/DeckLinkAPI_h.h index 4a466545a..11230eccc 100644 --- a/modules/decklink/interop/DeckLinkAPI_h.h +++ b/modules/decklink/interop/DeckLinkAPI_h.h @@ -4,7 +4,7 @@ /* File created by MIDL compiler version 7.00.0555 */ -/* at Wed Sep 21 22:02:53 2011 +/* at Tue Oct 18 00:55:44 2011 */ /* Compiler settings for interop\DeckLinkAPI.idl: Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 diff --git a/modules/decklink/interop/DeckLinkAPI_i.c b/modules/decklink/interop/DeckLinkAPI_i.c index a31a6cd25..1b61c2c29 100644 --- a/modules/decklink/interop/DeckLinkAPI_i.c +++ b/modules/decklink/interop/DeckLinkAPI_i.c @@ -6,7 +6,7 @@ /* File created by MIDL compiler version 7.00.0555 */ -/* at Wed Sep 21 22:02:53 2011 +/* at Tue Oct 18 00:55:44 2011 */ /* Compiler settings for interop\DeckLinkAPI.idl: Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 diff --git a/modules/ffmpeg/StdAfx.h b/modules/ffmpeg/StdAfx.h index fb3987712..9dbd5c4f4 100644 --- a/modules/ffmpeg/StdAfx.h +++ b/modules/ffmpeg/StdAfx.h @@ -41,9 +41,8 @@ #include #include -#include -#include -#include +#include +#include #include #include diff --git a/modules/ffmpeg/producer/audio/audio_decoder.cpp b/modules/ffmpeg/producer/audio/audio_decoder.cpp index 3f5fe7f72..a8bb817b8 100644 --- a/modules/ffmpeg/producer/audio/audio_decoder.cpp +++ b/modules/ffmpeg/producer/audio/audio_decoder.cpp @@ -20,17 +20,15 @@ #include "../../stdafx.h" #include "audio_decoder.h" - #include "audio_resampler.h" +#include "../util.h" #include "../../ffmpeg_error.h" #include #include -#include - #if defined(_MSC_VER) #pragma warning (push) #pragma warning (disable : 4244) @@ -46,21 +44,30 @@ extern "C" namespace caspar { namespace ffmpeg { -struct audio_decoder::implementation : boost::noncopyable -{ +struct audio_decoder::implementation : public Concurrency::agent, boost::noncopyable +{ + audio_decoder::token_t& active_token_; + audio_decoder::source_t& source_; + audio_decoder::target_t& target_; + std::shared_ptr codec_context_; const core::video_format_desc format_desc_; int index_; std::unique_ptr resampler_; std::vector> buffer1_; - - std::queue> packets_; - + int64_t nb_frames_; public: - explicit implementation(const safe_ptr& context, const core::video_format_desc& format_desc) - : format_desc_(format_desc) + explicit implementation(audio_decoder::token_t& active_token, + audio_decoder::source_t& source, + audio_decoder::target_t& target, + const safe_ptr& context, + const core::video_format_desc& format_desc) + : active_token_(active_token) + , source_(source) + , target_(target) + , format_desc_(format_desc) , nb_frames_(0) { try @@ -85,55 +92,51 @@ public: CASPAR_LOG_CURRENT_EXCEPTION(); CASPAR_LOG(warning) << "[audio_decoder] Failed to open audio-stream. Running without audio."; } - } - void push(const std::shared_ptr& packet) - { - if(packet && packet->stream_index != index_) - return; + start(); + } - packets_.push(packet); - } - - std::vector> poll() + ~implementation() { - std::vector> result; - - if(packets_.empty()) - return result; - - if(!codec_context_) - return empty_poll(); - - auto packet = packets_.front(); + agent::wait(this); + } - if(packet) + virtual void run() + { + try { - result.push_back(decode(*packet)); - if(packet->size == 0) - packets_.pop(); + while(Concurrency::receive(active_token_)) + { + auto packet = Concurrency::receive(source_); + if(packet == eof_packet()) + { + Concurrency::send(target_, eof_audio()); + break; + } + + if(packet == loop_packet()) + { + if(codec_context_) + avcodec_flush_buffers(codec_context_.get()); + Concurrency::send(target_, loop_audio()); + } + else if(!codec_context_) + Concurrency::send(target_, empty_audio()); + else + Concurrency::send(target_, decode(*packet)); + } + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); } - else - { - avcodec_flush_buffers(codec_context_.get()); - result.push_back(nullptr); - packets_.pop(); - } - - return result; - } - std::vector> empty_poll() - { - auto packet = packets_.front(); - packets_.pop(); + std::shared_ptr packet; + Concurrency::try_receive(source_, packet); - if(!packet) - return boost::assign::list_of(nullptr); - - return boost::assign::list_of(std::make_shared(format_desc_.audio_samples_per_frame, 0)); + done(); } - + std::shared_ptr decode(AVPacket& pkt) { buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2); @@ -154,17 +157,16 @@ public: return std::make_shared(samples, samples + n_samples); } - - bool ready() const - { - return !packets_.empty(); - } }; -audio_decoder::audio_decoder(const safe_ptr& context, const core::video_format_desc& format_desc) : impl_(new implementation(context, format_desc)){} -void audio_decoder::push(const std::shared_ptr& packet){impl_->push(packet);} -bool audio_decoder::ready() const{return impl_->ready();} -std::vector> audio_decoder::poll(){return impl_->poll();} +audio_decoder::audio_decoder(token_t& active_token, + source_t& source, + target_t& target, + const safe_ptr& context, + const core::video_format_desc& format_desc) + : impl_(new implementation(active_token, source, target, context, format_desc)) +{ +} int64_t audio_decoder::nb_frames() const{return impl_->nb_frames_;} }} \ No newline at end of file diff --git a/modules/ffmpeg/producer/audio/audio_decoder.h b/modules/ffmpeg/producer/audio/audio_decoder.h index e446bd20f..b70a307b3 100644 --- a/modules/ffmpeg/producer/audio/audio_decoder.h +++ b/modules/ffmpeg/producer/audio/audio_decoder.h @@ -25,6 +25,7 @@ #include +#include #include struct AVPacket; @@ -43,15 +44,21 @@ namespace ffmpeg { class audio_decoder : boost::noncopyable { public: - explicit audio_decoder(const safe_ptr& context, const core::video_format_desc& format_desc); - - void push(const std::shared_ptr& packet); - bool ready() const; - std::vector> poll(); + typedef Concurrency::ISource token_t; + typedef Concurrency::ISource> source_t; + typedef Concurrency::ITarget> target_t; + + explicit audio_decoder(token_t& active_token, + source_t& source, + target_t& target, + const safe_ptr& context, + const core::video_format_desc& format_desc); + int64_t nb_frames() const; private: + struct implementation; safe_ptr impl_; }; diff --git a/modules/ffmpeg/producer/audio/audio_resampler.cpp b/modules/ffmpeg/producer/audio/audio_resampler.cpp index 179d09cc5..9f953f130 100644 --- a/modules/ffmpeg/producer/audio/audio_resampler.cpp +++ b/modules/ffmpeg/producer/audio/audio_resampler.cpp @@ -62,7 +62,7 @@ struct audio_resampler::implementation std::vector> resample(std::vector>&& data) { - if(resampler_) + if(resampler_ && !data.empty()) { buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2); auto ret = audio_resample(resampler_.get(), diff --git a/modules/ffmpeg/producer/audio/audio_resampler.h b/modules/ffmpeg/producer/audio/audio_resampler.h index e25d06bf5..c03491cbe 100644 --- a/modules/ffmpeg/producer/audio/audio_resampler.h +++ b/modules/ffmpeg/producer/audio/audio_resampler.h @@ -4,6 +4,8 @@ #include +#include + namespace caspar { namespace ffmpeg { class audio_resampler diff --git a/modules/ffmpeg/producer/ffmpeg_producer.cpp b/modules/ffmpeg/producer/ffmpeg_producer.cpp index 480e95b8d..ee30e2bbc 100644 --- a/modules/ffmpeg/producer/ffmpeg_producer.cpp +++ b/modules/ffmpeg/producer/ffmpeg_producer.cpp @@ -27,6 +27,7 @@ #include "audio/audio_decoder.h" #include "video/video_decoder.h" +#include #include #include #include @@ -44,100 +45,84 @@ #include #include -#include +#include +#include namespace caspar { namespace ffmpeg { + +template +struct buffer_alias +{ + typedef Concurrency::bounded_buffer> type; +}; struct ffmpeg_producer : public core::frame_producer -{ - const std::wstring filename_; - - const safe_ptr graph_; - boost::timer frame_timer_; - boost::timer video_timer_; - boost::timer audio_timer_; +{ + const std::wstring filename_; + const int start_; + const bool loop_; + const size_t length_; + + buffer_alias::type video_packets_; + buffer_alias::type audio_packets_; + buffer_alias::type video_frames_; + buffer_alias::type audio_buffers_; + buffer_alias::type muxed_frames_; + Concurrency::overwrite_buffer active_token_; + + const safe_ptr graph_; - const safe_ptr frame_factory_; - const core::video_format_desc format_desc_; - - input input_; - video_decoder video_decoder_; - audio_decoder audio_decoder_; - double fps_; - frame_muxer muxer_; - - const int start_; - const bool loop_; - const size_t length_; + input input_; + video_decoder video_decoder_; + audio_decoder audio_decoder_; + frame_muxer2 muxer_; - safe_ptr last_frame_; - - const size_t width_; - const size_t height_; - bool is_progressive_; + safe_ptr last_frame_; public: explicit ffmpeg_producer(const safe_ptr& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) : filename_(filename) - , graph_(diagnostics::create_graph([this]{return print();})) - , frame_factory_(frame_factory) - , format_desc_(frame_factory->get_video_format_desc()) - , input_(graph_, filename_, loop, start, length) - , video_decoder_(input_.context(), frame_factory, filter) - , audio_decoder_(input_.context(), frame_factory->get_video_format_desc()) - , fps_(video_decoder_.fps()) - , muxer_(fps_, frame_factory) , start_(start) , loop_(loop) , length_(length) + , video_packets_(25) + , audio_packets_(25) + , video_frames_(2) + , audio_buffers_(2) + , muxed_frames_(2) + , graph_(diagnostics::create_graph([this]{return print();}, false)) + , input_(active_token_, video_packets_, audio_packets_, graph_, filename_, loop, start, length) + , video_decoder_(active_token_, video_packets_, video_frames_, input_.context(), frame_factory->get_video_format_desc().fps, filter) + , audio_decoder_(active_token_, audio_packets_, audio_buffers_, input_.context(), frame_factory->get_video_format_desc()) + , muxer_(active_token_, video_frames_, audio_buffers_, muxed_frames_, video_decoder_.fps(), frame_factory) , last_frame_(core::basic_frame::empty()) - , width_(video_decoder_.width()) - , height_(video_decoder_.height()) - , is_progressive_(true) { - graph_->add_guide("frame-time", 0.5); - graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f)); graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f)); - - // Do some pre-work in order to not block rendering thread for initialization and allocations. + graph_->start(); - push_packets(); - auto video_frames = video_decoder_.poll(); - if(!video_frames.empty()) - { - auto& video_frame = video_frames.front(); - auto desc = get_pixel_format_desc(static_cast(video_frame->format), video_frame->width, video_frame->height); - if(desc.pix_fmt == core::pixel_format::invalid) - get_pixel_format_desc(PIX_FMT_BGRA, video_frame->width, video_frame->height); - - for(int n = 0; n < 3; ++n) - frame_factory->create_frame(this, desc); - } - BOOST_FOREACH(auto& video, video_frames) - muxer_.push(video, 0); + Concurrency::send(active_token_, true); + } + + ~ffmpeg_producer() + { + Concurrency::send(active_token_, false); + std::shared_ptr frame; + Concurrency::try_receive(muxed_frames_, frame); } - + virtual safe_ptr receive(int hints) { auto frame = core::basic_frame::late(); - frame_timer_.restart(); - - for(int n = 0; n < 64 && muxer_.empty(); ++n) - decode_frame(hints); - - graph_->update_value("frame-time", static_cast(frame_timer_.elapsed()*format_desc_.fps*0.5)); - - if(!muxer_.empty()) - frame = last_frame_ = muxer_.pop(); - else - { - if(input_.eof()) - return core::basic_frame::eof(); - else - graph_->add_tag("underflow"); + try + { + frame = last_frame_ = safe_ptr(Concurrency::receive(muxed_frames_, 8)); } - + catch(Concurrency::operation_timed_out&) + { + graph_->add_tag("underflow"); + } + return frame; } @@ -145,51 +130,8 @@ public: { return disable_audio(last_frame_); } - - void push_packets() - { - for(int n = 0; n < 16 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) - { - std::shared_ptr pkt; - if(input_.try_pop(pkt)) - { - video_decoder_.push(pkt); - audio_decoder_.push(pkt); - } - } - } - - void decode_frame(int hints) - { - push_packets(); - - tbb::parallel_invoke( - [&] - { - if(muxer_.video_ready()) - return; - - auto video_frames = video_decoder_.poll(); - BOOST_FOREACH(auto& video, video_frames) - { - is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_; - muxer_.push(video, hints); - } - }, - [&] - { - if(muxer_.audio_ready()) - return; - - auto audio_samples = audio_decoder_.poll(); - BOOST_FOREACH(auto& audio, audio_samples) - muxer_.push(audio); - }); - - muxer_.commit(); - } - - virtual int64_t nb_frames() const + + virtual int64_t nb_frames() const { if(loop_) return std::numeric_limits::max(); @@ -215,8 +157,8 @@ public: virtual std::wstring print() const { return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" - + boost::lexical_cast(width_) + L"x" + boost::lexical_cast(height_) - + (is_progressive_ ? L"p" : L"i") + boost::lexical_cast(is_progressive_ ? fps_ : 2.0 * fps_) + + boost::lexical_cast(video_decoder_.width()) + L"x" + boost::lexical_cast(video_decoder_.height()) + + (video_decoder_.is_progressive() ? L"p" : L"i") + boost::lexical_cast(video_decoder_.is_progressive() ? video_decoder_.fps() : 2.0 * video_decoder_.fps()) + L"]"; } }; diff --git a/modules/ffmpeg/producer/filter/parallel_yadif.cpp b/modules/ffmpeg/producer/filter/parallel_yadif.cpp index 0f2f04f1a..892f17572 100644 --- a/modules/ffmpeg/producer/filter/parallel_yadif.cpp +++ b/modules/ffmpeg/producer/filter/parallel_yadif.cpp @@ -16,8 +16,8 @@ extern "C" #pragma warning (pop) #endif -#include -#include +#include +#include #include @@ -67,10 +67,9 @@ void parallel_yadif_filter_line(parallel_yadif_context& ctx, uint8_t *dst, uint8 if(ctx.index == ctx.last_index) { - tbb::parallel_for(tbb::blocked_range(0, ctx.index), [=](const tbb::blocked_range& r) + Concurrency::parallel_for(0, ctx.index, [=](size_t n) { - for(auto n = r.begin(); n != r.end(); ++n) - org_yadif_filter_line(ctx.args[n].dst, ctx.args[n].prev, ctx.args[n].cur, ctx.args[n].next, ctx.args[n].w, ctx.args[n].prefs, ctx.args[n].mrefs, ctx.args[n].parity, ctx.args[n].mode); + org_yadif_filter_line(ctx.args[n].dst, ctx.args[n].prev, ctx.args[n].cur, ctx.args[n].next, ctx.args[n].w, ctx.args[n].prefs, ctx.args[n].mrefs, ctx.args[n].parity, ctx.args[n].mode); }); ctx.index = 0; } @@ -78,7 +77,7 @@ void parallel_yadif_filter_line(parallel_yadif_context& ctx, uint8_t *dst, uint8 namespace caspar { namespace ffmpeg { -tbb::concurrent_bounded_queue parallel_line_func_pool; +Concurrency::concurrent_queue parallel_line_func_pool; std::array ctxs; #define RENAME(a) f ## a diff --git a/modules/ffmpeg/producer/frame_muxer.cpp b/modules/ffmpeg/producer/frame_muxer.cpp index 807b86e82..c9b750a2f 100644 --- a/modules/ffmpeg/producer/frame_muxer.cpp +++ b/modules/ffmpeg/producer/frame_muxer.cpp @@ -35,6 +35,9 @@ extern "C" #include #include +#include +#include + #include #include #include @@ -419,4 +422,365 @@ bool frame_muxer::video_ready() const{return impl_->video_ready();} bool frame_muxer::audio_ready() const{return impl_->audio_ready();} int64_t frame_muxer::calc_nb_frames(int64_t nb_frames) const {return impl_->calc_nb_frames(nb_frames);} + +struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable +{ + frame_muxer2::token_t& active_token_; + frame_muxer2::video_source_t& video_source_; + frame_muxer2::audio_source_t& audio_source_; + frame_muxer2::target_t& target_; + + std::deque>> video_streams_; + std::deque audio_streams_; + std::deque> frame_buffer_; + display_mode::type display_mode_; + const double in_fps_; + const video_format_desc format_desc_; + bool auto_transcode_; + + size_t audio_sample_count_; + size_t video_frame_count_; + + size_t processed_audio_sample_count_; + size_t processed_video_frame_count_; + + filter filter_; + safe_ptr frame_factory_; + + implementation(frame_muxer2::token_t& active_token, + frame_muxer2::video_source_t& video_source, + frame_muxer2::audio_source_t& audio_source, + frame_muxer2::target_t& target, + double in_fps, + const safe_ptr& frame_factory) + : active_token_(active_token) + , video_source_(video_source) + , audio_source_(audio_source) + , target_(target) + , video_streams_(1) + , audio_streams_(1) + , display_mode_(display_mode::invalid) + , in_fps_(in_fps) + , format_desc_(frame_factory->get_video_format_desc()) + , auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false)) + , audio_sample_count_(0) + , video_frame_count_(0) + , frame_factory_(make_safe(frame_factory)) + { + start(); + } + + ~implementation() + { + agent::wait(this); + } + + virtual void run() + { + try + { + while(Concurrency::receive(active_token_)) + { + Concurrency::parallel_invoke( + [&] + { + while(!video_ready()) + { + auto video = Concurrency::receive(video_source_); + if(video == eof_video()) + break; + push(video, 0); + } + }, + [&] + { + while(!audio_ready()) + { + auto audio = Concurrency::receive(audio_source_); + if(audio == eof_audio()) + break; + push(audio); + } + }); + + if(!video_ready() || !audio_ready()) + { + Concurrency::send(target_, std::shared_ptr(core::basic_frame::eof())); + break; + } + + commit(); + + if(!frame_buffer_.empty()) + { + Concurrency::send(target_, std::shared_ptr(frame_buffer_.front())); + frame_buffer_.pop_front(); + } + } + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } + + std::shared_ptr video; + Concurrency::try_receive(video_source_, video); + std::shared_ptr audio; + Concurrency::try_receive(audio_source_, audio); + + done(); + } + + void push(const std::shared_ptr& video_frame, int hints) + { + if(video_frame == loop_video()) + { + CASPAR_LOG(debug) << L"video-frame-count: " << static_cast(video_frame_count_); + video_frame_count_ = 0; + video_streams_.push_back(std::queue>()); + return; + } + + if(video_frame == empty_video()) + { + video_streams_.back().push(make_safe(this)); + ++video_frame_count_; + display_mode_ = display_mode::simple; + return; + } + + if(display_mode_ == display_mode::invalid) + { + if(auto_transcode_) + { + auto in_mode = get_mode(*video_frame); + display_mode_ = get_display_mode(in_mode, in_fps_, format_desc_.field_mode, format_desc_.fps); + + if(display_mode_ == display_mode::simple && in_mode != core::field_mode::progressive && format_desc_.field_mode != core::field_mode::progressive && video_frame->height != static_cast(format_desc_.height)) + display_mode_ = display_mode::deinterlace_bob_reinterlace; // The frame will most likely be scaled, we need to deinterlace->reinterlace + + if(display_mode_ == display_mode::deinterlace) + filter_ = filter(L"YADIF=0:-1"); + else if(display_mode_ == display_mode::deinterlace_bob || display_mode_ == display_mode::deinterlace_bob_reinterlace) + filter_ = filter(L"YADIF=1:-1"); + } + else + display_mode_ = display_mode::simple; + + if(display_mode_ == display_mode::invalid) + { + CASPAR_LOG(warning) << L"[frame_muxer] Failed to detect display-mode."; + display_mode_ = display_mode::simple; + } + + CASPAR_LOG(info) << "[frame_muxer] " << display_mode::print(display_mode_); + } + + + if(hints & core::frame_producer::ALPHA_HINT) + video_frame->format = make_alpha_format(video_frame->format); + + auto format = video_frame->format; + if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8 + video_frame->format = PIX_FMT_GRAY8; + + BOOST_FOREACH(auto& av_frame, filter_.execute(video_frame)) + { + av_frame->format = format; + + auto frame = make_write_frame(this, av_frame, frame_factory_, hints); + + // Fix field-order if needed + if(frame->get_type() == core::field_mode::lower && format_desc_.field_mode == core::field_mode::upper) + frame->get_frame_transform().fill_translation[1] += 1.0/static_cast(format_desc_.height); + else if(frame->get_type() == core::field_mode::upper && format_desc_.field_mode == core::field_mode::lower) + frame->get_frame_transform().fill_translation[1] -= 1.0/static_cast(format_desc_.height); + + video_streams_.back().push(frame); + ++video_frame_count_; + } + + if(video_streams_.back().size() > 8) + BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("video-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data.")); + } + + void push(std::shared_ptr audio_samples) + { + if(audio_samples == loop_audio()) + { + CASPAR_LOG(debug) << L"audio-chunk-count: " << audio_sample_count_/format_desc_.audio_samples_per_frame; + audio_streams_.push_back(core::audio_buffer()); + audio_sample_count_ = 0; + return; + } + + if(audio_samples == empty_audio()) + audio_samples = std::make_shared(format_desc_.audio_samples_per_frame); + + audio_sample_count_ += audio_samples->size(); + + boost::range::push_back(audio_streams_.back(), *audio_samples); + + if(audio_streams_.back().size() > 8*format_desc_.audio_samples_per_frame) + BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("audio-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data.")); + } + + size_t size() const + { + return frame_buffer_.size(); + } + + safe_ptr pop_video() + { + auto frame = video_streams_.front().front(); + video_streams_.front().pop(); + + return frame; + } + + core::audio_buffer pop_audio() + { + CASPAR_VERIFY(audio_streams_.front().size() >= format_desc_.audio_samples_per_frame); + + auto begin = audio_streams_.front().begin(); + auto end = begin + format_desc_.audio_samples_per_frame; + + auto samples = core::audio_buffer(begin, end); + audio_streams_.front().erase(begin, end); + + return samples; + } + + bool video_ready() const + { + return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && video_ready2()); + } + + bool audio_ready() const + { + return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && audio_ready2()); + } + + bool video_ready2() const + { + switch(display_mode_) + { + case display_mode::deinterlace_bob_reinterlace: + case display_mode::interlace: + return video_streams_.front().size() >= 2; + default: + return !video_streams_.front().empty(); + } + } + + bool audio_ready2() const + { + switch(display_mode_) + { + case display_mode::duplicate: + return audio_streams_.front().size()/2 >= format_desc_.audio_samples_per_frame; + default: + return audio_streams_.front().size() >= format_desc_.audio_samples_per_frame; + } + } + + void commit() + { + if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!video_ready2() || !audio_ready2())) + { + if(!video_streams_.front().empty() || !audio_streams_.front().empty()) + CASPAR_LOG(debug) << "Truncating: " << video_streams_.front().size() << L" video-frames, " << audio_streams_.front().size() << L" audio-samples."; + + video_streams_.pop_front(); + audio_streams_.pop_front(); + } + + if(!video_ready2() || !audio_ready2()) + return; + + switch(display_mode_) + { + case display_mode::simple: return simple(frame_buffer_); + case display_mode::duplicate: return duplicate(frame_buffer_); + case display_mode::half: return half(frame_buffer_); + case display_mode::interlace: return interlace(frame_buffer_); + case display_mode::deinterlace_bob: return simple(frame_buffer_); + case display_mode::deinterlace_bob_reinterlace: return interlace(frame_buffer_); + case display_mode::deinterlace: return simple(frame_buffer_); + default: BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode")); + } + } + + void simple(std::deque>& dest) + { + auto frame1 = pop_video(); + frame1->audio_data() = pop_audio(); + + dest.push_back(frame1); + } + + void duplicate(std::deque>& dest) + { + auto frame = pop_video(); + + auto frame1 = make_safe(*frame); // make a copy + frame1->audio_data() = pop_audio(); + + auto frame2 = frame; + frame2->audio_data() = pop_audio(); + + dest.push_back(frame1); + dest.push_back(frame2); + } + + void half(std::deque>& dest) + { + auto frame1 = pop_video(); + frame1->audio_data() = pop_audio(); + + video_streams_.front().pop(); // Throw away + + dest.push_back(frame1); + } + + void interlace(std::deque>& dest) + { + auto frame1 = pop_video(); + frame1->audio_data() = pop_audio(); + + auto frame2 = pop_video(); + + dest.push_back(core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode)); + } + + int64_t calc_nb_frames(int64_t nb_frames) const + { + switch(display_mode_) + { + case display_mode::interlace: + case display_mode::half: + return nb_frames/2; + case display_mode::duplicate: + case display_mode::deinterlace_bob: + return nb_frames*2; + default: + return nb_frames; + } + } +}; + +frame_muxer2::frame_muxer2(token_t& active_token, + video_source_t& video_source, + audio_source_t& audio_source, + target_t& target, + double in_fps, + const safe_ptr& frame_factory) + : impl_(new implementation(active_token, video_source, audio_source, target, in_fps, frame_factory)) +{ +} +int64_t frame_muxer2::calc_nb_frames(int64_t nb_frames) const +{ + return impl_->calc_nb_frames(nb_frames); +} + }} \ No newline at end of file diff --git a/modules/ffmpeg/producer/frame_muxer.h b/modules/ffmpeg/producer/frame_muxer.h index 8aa79834c..8c43ff5b2 100644 --- a/modules/ffmpeg/producer/frame_muxer.h +++ b/modules/ffmpeg/producer/frame_muxer.h @@ -6,6 +6,7 @@ #include +#include #include struct AVFrame; @@ -46,4 +47,26 @@ private: safe_ptr impl_; }; +class frame_muxer2 : boost::noncopyable +{ +public: + + typedef Concurrency::ISource token_t; + typedef Concurrency::ISource> video_source_t; + typedef Concurrency::ISource> audio_source_t; + typedef Concurrency::ITarget> target_t; + + frame_muxer2(token_t& active_token, + video_source_t& video_source, + audio_source_t& audio_source, + target_t& target, + double in_fps, + const safe_ptr& frame_factory); + + int64_t calc_nb_frames(int64_t nb_frames) const; +private: + struct implementation; + safe_ptr impl_; +}; + }} \ No newline at end of file diff --git a/modules/ffmpeg/producer/input.cpp b/modules/ffmpeg/producer/input.cpp index c8b32cd4a..2627be0a8 100644 --- a/modules/ffmpeg/producer/input.cpp +++ b/modules/ffmpeg/producer/input.cpp @@ -24,22 +24,22 @@ #include "../stdafx.h" #include "input.h" +#include "util.h" #include "../ffmpeg_error.h" #include "../tbb_avcodec.h" #include +#include #include #include #include -#include #include #include -#include -#include -#include + +#include #if defined(_MSC_VER) #pragma warning (push) @@ -55,13 +55,15 @@ extern "C" #pragma warning (pop) #endif +using namespace Concurrency; + namespace caspar { namespace ffmpeg { static const size_t MAX_BUFFER_COUNT = 100; static const size_t MIN_BUFFER_COUNT = 4; static const size_t MAX_BUFFER_SIZE = 16 * 1000000; -struct input::implementation : boost::noncopyable +struct input::implementation : public Concurrency::agent, boost::noncopyable { std::shared_ptr format_context_; // Destroy this last int default_stream_index_; @@ -74,27 +76,35 @@ struct input::implementation : boost::noncopyable const size_t length_; size_t frame_number_; - tbb::concurrent_bounded_queue> buffer_; - tbb::atomic buffer_size_; - boost::condition_variable buffer_cond_; - boost::mutex buffer_mutex_; + input::token_t& active_token_; + input::target_t& video_target_; + input::target_t& audio_target_; - boost::thread thread_; - tbb::atomic is_running_; - tbb::atomic nb_frames_; tbb::atomic nb_loops_; + int video_index_; + int audio_index_; + public: - explicit implementation(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) - : graph_(graph) + explicit implementation(input::token_t& active_token, + input::target_t& video_target, + input::target_t& audio_target, + const safe_ptr& graph, + const std::wstring& filename, + bool loop, + size_t start, + size_t length) + : active_token_(active_token) + , video_target_(video_target) + , audio_target_(audio_target) + , graph_(graph) , loop_(loop) , filename_(filename) , start_(start) , length_(length) , frame_number_(0) { - is_running_ = true; nb_frames_ = 0; nb_loops_ = 0; @@ -108,96 +118,58 @@ public: THROW_ON_ERROR2(avformat_find_stream_info(format_context_.get(), nullptr), print()); default_stream_index_ = THROW_ON_ERROR2(av_find_default_stream_index(format_context_.get()), print()); - + video_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + audio_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_AUDIO, -1, -1, 0, 0); + if(start_ > 0) seek_frame(start_); - for(int n = 0; n < 16 && !full(); ++n) + for(int n = 0; n < 16; ++n) read_next_packet(); graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); - graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); - graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); - thread_ = boost::thread([this]{run();}); + agent::start(); } ~implementation() { - is_running_ = false; - buffer_cond_.notify_all(); - thread_.join(); - } - - bool try_pop(std::shared_ptr& packet) - { - const bool result = buffer_.try_pop(packet); - - if(result) - { - if(packet) - buffer_size_ -= packet->size; - buffer_cond_.notify_all(); - } - - graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); - graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); - - return result; - } - - size_t nb_frames() const - { - return nb_frames_; - } - - size_t nb_loops() const - { - return nb_loops_; + agent::wait(this); } - -private: - void run() - { - caspar::win32_exception::install_handler(); - + virtual void run() + { try { - CASPAR_LOG(info) << print() << " Thread Started."; - - while(is_running_) + while(Concurrency::receive(active_token_)) { + if(!read_next_packet()) { - boost::unique_lock lock(buffer_mutex_); - while(full()) - buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20)); + Concurrency::send(video_target_, eof_packet()); + Concurrency::send(audio_target_, eof_packet()); + break; } - read_next_packet(); - } - - CASPAR_LOG(info) << print() << " Thread Stopped."; + } } catch(...) { CASPAR_LOG_CURRENT_EXCEPTION(); - is_running_ = false; - } + } + + done(); } - - void read_next_packet() + + bool read_next_packet() { int ret = 0; - std::shared_ptr read_packet(new AVPacket, [](AVPacket* p) + auto read_packet = create_packet(); + { - av_free_packet(p); - delete p; - }); - av_init_packet(read_packet.get()); + Concurrency::scoped_oversubcription_token oversubscribe; + ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. + } - ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. - if(is_eof(ret)) { ++nb_loops_; @@ -221,11 +193,11 @@ private: } else { - is_running_ = false; CASPAR_LOG(trace) << print() << " Stopping."; + return false; } } - else + else if(read_packet->stream_index == video_index_ || read_packet->stream_index == audio_index_) { THROW_ON_ERROR(ret, print(), "av_read_frame"); @@ -247,24 +219,23 @@ private: read_packet->size = size; read_packet->data = data; }); + + if(read_packet->stream_index == video_index_) + Concurrency::send(video_target_, read_packet); + else if(read_packet->stream_index == audio_index_) + Concurrency::send(audio_target_, read_packet); + } - buffer_.try_push(read_packet); - buffer_size_ += read_packet->size; - - graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); - graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); - } - } - - bool full() const - { - return is_running_ && (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT; + return true; } void seek_frame(int64_t frame, int flags = 0) { - THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); - buffer_.push(nullptr); + THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); + auto packet = create_packet(); + packet->size = 0; + Concurrency::send(video_target_, loop_packet()); + Concurrency::send(audio_target_, loop_packet()); } bool is_eof(int ret) @@ -283,11 +254,31 @@ private: } }; -input::input(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) - : impl_(new implementation(graph, filename, loop, start, length)){} -bool input::eof() const {return !impl_->is_running_;} -bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} -safe_ptr input::context(){return make_safe(impl_->format_context_);} -size_t input::nb_frames() const {return impl_->nb_frames();} -size_t input::nb_loops() const {return impl_->nb_loops();} +input::input(token_t& active_token, + target_t& video_target, + target_t& audio_target, + const safe_ptr& graph, + const std::wstring& filename, + bool loop, + size_t start, + size_t length) + : impl_(new implementation(active_token, video_target, audio_target, graph, filename, loop, start, length)) +{ +} + +safe_ptr input::context() +{ + return safe_ptr(impl_->format_context_); +} + +size_t input::nb_frames() const +{ + return impl_->nb_frames_; +} + +size_t input::nb_loops() const +{ + return impl_->nb_loops_; +} + }} \ No newline at end of file diff --git a/modules/ffmpeg/producer/input.h b/modules/ffmpeg/producer/input.h index 6b391581f..ea7741da9 100644 --- a/modules/ffmpeg/producer/input.h +++ b/modules/ffmpeg/producer/input.h @@ -21,10 +21,12 @@ #include +#include #include #include #include +#include struct AVFormatContext; struct AVPacket; @@ -38,20 +40,28 @@ class graph; } namespace ffmpeg { - + class input : boost::noncopyable { public: - explicit input(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start = 0, size_t length = std::numeric_limits::max()); - - bool try_pop(std::shared_ptr& packet); - bool eof() const; + + typedef Concurrency::ISource token_t; + typedef Concurrency::ITarget> target_t; + explicit input(token_t& active_token, + target_t& video_target, + target_t& audio_target, + const safe_ptr& graph, + const std::wstring& filename, bool loop, + size_t start = 0, + size_t length = std::numeric_limits::max()); + size_t nb_frames() const; size_t nb_loops() const; - + safe_ptr context(); private: + friend struct implemenation; struct implementation; std::shared_ptr impl_; }; diff --git a/modules/ffmpeg/producer/util.cpp b/modules/ffmpeg/producer/util.cpp index 79fae01a8..a98c08737 100644 --- a/modules/ffmpeg/producer/util.cpp +++ b/modules/ffmpeg/producer/util.cpp @@ -5,7 +5,7 @@ #include "format/flv.h" #include -#include +#include #include #include @@ -14,7 +14,7 @@ #include -#include +#include #include #include @@ -127,7 +127,7 @@ int make_alpha_format(int format) safe_ptr make_write_frame(const void* tag, const safe_ptr& decoded_frame, const safe_ptr& frame_factory, int hints) { - static tbb::concurrent_unordered_map>> sws_contexts_; + static tbb::concurrent_unordered_map>> sws_contexts_; const auto width = decoded_frame->width; const auto height = decoded_frame->height; @@ -188,12 +188,10 @@ safe_ptr make_write_frame(const void* tag, const safe_ptrlinesize[n]; // Copy line by line since ffmpeg sometimes pads each line. - tbb::affinity_partitioner ap; - tbb::parallel_for(tbb::blocked_range(0, static_cast(desc.planes[n].height)), [&](const tbb::blocked_range& r) + Concurrency::parallel_for(0, static_cast(desc.planes[n].height), [&](size_t y) { - for(size_t y = r.begin(); y != r.end(); ++y) - memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize); - }, ap); + memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize); + }); write->commit(n); } @@ -287,4 +285,64 @@ void fix_meta_data(AVFormatContext& context) video_context.time_base.den = static_cast(closest_fps*1000000.0); } +std::shared_ptr create_packet() +{ + std::shared_ptr packet(new AVPacket, [](AVPacket* p) + { + av_free_packet(p); + delete p; + }); + + av_init_packet(packet.get()); + return packet; +} + +const std::shared_ptr& loop_packet() +{ + static auto packet1 = create_packet(); + return packet1; +} + +const std::shared_ptr& eof_packet() +{ + static auto packet2 = create_packet(); + return packet2; +} + +const std::shared_ptr& loop_video() +{ + static auto frame1 = std::shared_ptr(avcodec_alloc_frame(), av_free); + return frame1; +} + +const std::shared_ptr& empty_video() +{ + static auto frame1 = std::shared_ptr(avcodec_alloc_frame(), av_free); + return frame1; +} + +const std::shared_ptr& eof_video() +{ + static auto frame2 = std::shared_ptr(avcodec_alloc_frame(), av_free); + return frame2; +} + +const std::shared_ptr& loop_audio() +{ + static auto audio1 = std::make_shared(); + return audio1; +} + +const std::shared_ptr& empty_audio() +{ + static auto audio1 = std::make_shared(); + return audio1; +} + +const std::shared_ptr& eof_audio() +{ + static auto audio2 = std::make_shared(); + return audio2; +} + }} \ No newline at end of file diff --git a/modules/ffmpeg/producer/util.h b/modules/ffmpeg/producer/util.h index 7b32ff9f0..1811413db 100644 --- a/modules/ffmpeg/producer/util.h +++ b/modules/ffmpeg/producer/util.h @@ -4,14 +4,25 @@ #include #include +#include -extern "C" + +#if defined(_MSC_VER) +#pragma warning (push) +#pragma warning (disable : 4244) +#endif +extern "C" { #include + #include } +#if defined(_MSC_VER) +#pragma warning (pop) +#endif struct AVFrame; struct AVFormatContext; +struct AVPacket; namespace caspar { @@ -35,4 +46,14 @@ safe_ptr make_write_frame(const void* tag, const safe_ptr create_packet(); +const std::shared_ptr& loop_packet(); +const std::shared_ptr& eof_packet(); +const std::shared_ptr& loop_video(); +const std::shared_ptr& empty_video(); +const std::shared_ptr& eof_video(); +const std::shared_ptr& loop_audio(); +const std::shared_ptr& empty_audio(); +const std::shared_ptr& eof_audio(); + }} \ No newline at end of file diff --git a/modules/ffmpeg/producer/video/video_decoder.cpp b/modules/ffmpeg/producer/video/video_decoder.cpp index 4e2ba9664..cc0eaaef0 100644 --- a/modules/ffmpeg/producer/video/video_decoder.cpp +++ b/modules/ffmpeg/producer/video/video_decoder.cpp @@ -27,13 +27,7 @@ #include "../../ffmpeg_error.h" #include "../../tbb_avcodec.h" -#include -#include - -#include -#include - -#include +#include #if defined(_MSC_VER) #pragma warning (push) @@ -50,14 +44,15 @@ extern "C" namespace caspar { namespace ffmpeg { -struct video_decoder::implementation : boost::noncopyable +struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable { - const safe_ptr frame_factory_; + video_decoder::token_t& active_token_; + video_decoder::source_t& source_; + video_decoder::target_t& target_; + std::shared_ptr codec_context_; int index_; - std::queue> packets_; - filter filter_; double fps_; @@ -65,15 +60,24 @@ struct video_decoder::implementation : boost::noncopyable size_t width_; size_t height_; + bool is_progressive_; public: - explicit implementation(const safe_ptr& context, const safe_ptr& frame_factory, const std::wstring& filter) - : frame_factory_(frame_factory) - , filter_(filter) - , fps_(frame_factory_->get_video_format_desc().fps) + explicit implementation(video_decoder::token_t& active_token, + video_decoder::source_t& source, + video_decoder::target_t& target, + const safe_ptr& context, + double fps, + const std::wstring& filter) + : active_token_(active_token) + , source_(source) + , target_(target) + , filter_(filter.empty() ? L"copy" : filter) + , fps_(fps) , nb_frames_(0) , width_(0) , height_(0) + , is_progressive_(true) { try { @@ -106,72 +110,77 @@ public: CASPAR_LOG_CURRENT_EXCEPTION(); CASPAR_LOG(warning) << "[video_decoder] Failed to open video-stream. Running without video."; } - } - void push(const std::shared_ptr& packet) + start(); + } + + ~implementation() { - if(packet && packet->stream_index != index_) - return; - - packets_.push(packet); + agent::wait(this); } - - std::vector> poll() - { - std::vector> result; - - if(packets_.empty()) - return result; - - if(!codec_context_) - return empty_poll(); - - auto packet = packets_.front(); - - if(packet) - { - BOOST_FOREACH(auto& frame, decode(*packet)) - boost::range::push_back(result, filter_.execute(frame)); - - if(packet->size == 0) - packets_.pop(); - } - else + + virtual void run() + { + try { - if(codec_context_->codec->capabilities & CODEC_CAP_DELAY) + while(Concurrency::receive(active_token_)) { - AVPacket pkt; - av_init_packet(&pkt); - pkt.data = nullptr; - pkt.size = 0; - - BOOST_FOREACH(auto& frame, decode(pkt)) - boost::range::push_back(result, filter_.execute(frame)); - } - - if(result.empty()) - { - packets_.pop(); - avcodec_flush_buffers(codec_context_.get()); - result.push_back(nullptr); + auto packet = Concurrency::receive(source_); + if(packet == eof_packet()) + { + Concurrency::send(target_, eof_video()); + break; + } + + if(packet == loop_packet()) + { + if(codec_context_) + { + if(codec_context_->codec->capabilities & CODEC_CAP_DELAY) + { + AVPacket pkt; + av_init_packet(&pkt); + pkt.data = nullptr; + pkt.size = 0; + + BOOST_FOREACH(auto& frame1, decode(pkt)) + { + BOOST_FOREACH(auto& frame2, filter_.execute(frame1)) + Concurrency::send(target_, std::shared_ptr(frame2)); + } + } + + avcodec_flush_buffers(codec_context_.get()); + } + + Concurrency::send(target_, loop_video()); + } + else if(!codec_context_) + { + Concurrency::send(target_, empty_video()); + } + else + { + while(packet->size > 0) + { + BOOST_FOREACH(auto& frame1, decode(*packet)) + { + BOOST_FOREACH(auto& frame2, filter_.execute(frame1)) + Concurrency::send(target_, std::shared_ptr(frame2)); + } + } + } } } - - return result; - } - - std::vector> empty_poll() - { - auto packet = packets_.front(); - packets_.pop(); - - if(!packet) - return boost::assign::list_of(nullptr); - - std::shared_ptr frame(avcodec_alloc_frame(), av_free); - frame->data[0] = nullptr; + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } - return boost::assign::list_of(frame); + std::shared_ptr packet; + Concurrency::try_receive(source_, packet); + + done(); } std::vector> decode(AVPacket& pkt) @@ -193,27 +202,31 @@ public: if(decoded_frame->repeat_pict % 2 > 0) CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented."; + is_progressive_ = decoded_frame->interlaced_frame == 0; + return std::vector>(1 + decoded_frame->repeat_pict/2, decoded_frame); } - - bool ready() const - { - return !packets_.empty(); - } - + double fps() const { return fps_; } }; -video_decoder::video_decoder(const safe_ptr& context, const safe_ptr& frame_factory, const std::wstring& filter) : impl_(new implementation(context, frame_factory, filter)){} -void video_decoder::push(const std::shared_ptr& packet){impl_->push(packet);} -std::vector> video_decoder::poll(){return impl_->poll();} -bool video_decoder::ready() const{return impl_->ready();} +video_decoder::video_decoder(token_t& active_token, + source_t& source, + target_t& target, + const safe_ptr& context, + double fps, + const std::wstring& filter) + : impl_(new implementation(active_token, source, target, context, fps, filter)) +{ +} + double video_decoder::fps() const{return impl_->fps();} int64_t video_decoder::nb_frames() const{return impl_->nb_frames_;} size_t video_decoder::width() const{return impl_->width_;} size_t video_decoder::height() const{return impl_->height_;} +bool video_decoder::is_progressive() const{return impl_->is_progressive_;} }} \ No newline at end of file diff --git a/modules/ffmpeg/producer/video/video_decoder.h b/modules/ffmpeg/producer/video/video_decoder.h index 1351e881b..2fb0b8563 100644 --- a/modules/ffmpeg/producer/video/video_decoder.h +++ b/modules/ffmpeg/producer/video/video_decoder.h @@ -25,6 +25,7 @@ #include +#include #include struct AVFormatContext; @@ -35,7 +36,6 @@ namespace caspar { namespace core { struct frame_factory; - class write_frame; } namespace ffmpeg { @@ -43,16 +43,23 @@ namespace ffmpeg { class video_decoder : boost::noncopyable { public: - explicit video_decoder(const safe_ptr& context, const safe_ptr& frame_factory, const std::wstring& filter); - - void push(const std::shared_ptr& packet); - bool ready() const; - std::vector> poll(); - + + typedef Concurrency::ISource token_t; + typedef Concurrency::ISource> source_t; + typedef Concurrency::ITarget> target_t; + + explicit video_decoder(token_t& active_token, + source_t& source, + target_t& target, + const safe_ptr& context, + double fps, + const std::wstring& filter); + size_t width() const; size_t height() const; int64_t nb_frames() const; + bool is_progressive() const; double fps() const; private: diff --git a/modules/ffmpeg/tbb_avcodec.cpp b/modules/ffmpeg/tbb_avcodec.cpp index 71d623d60..1f0eb7b2a 100644 --- a/modules/ffmpeg/tbb_avcodec.cpp +++ b/modules/ffmpeg/tbb_avcodec.cpp @@ -8,7 +8,8 @@ #include #include -#include +#include + #include #include #include @@ -31,14 +32,11 @@ namespace caspar { namespace ffmpeg { int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size) { - tbb::parallel_for(tbb::blocked_range(0, count), [&](const tbb::blocked_range& r) + Concurrency::parallel_for(0, count, [&](size_t n) { - for(size_t n = r.begin(); n != r.end(); ++n) - { - int r = func(s, reinterpret_cast(arg) + n*size); - if(ret) - ret[n] = r; - } + int r = func(s, reinterpret_cast(arg) + n*size); + if(ret) + ret[n] = r; }); return 0; @@ -51,15 +49,12 @@ int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg CASPAR_ASSERT(tbb::tbb_thread::hardware_concurrency() < 16); // Note: this will probably only work when tbb::task_scheduler_init::num_threads() < 16. - tbb::parallel_for(tbb::blocked_range(0, count, 2), [&](const tbb::blocked_range &r) + Concurrency::parallel_for(0, count, 2, [&](int jobnr) { int threadnr = counter++; - for(int jobnr = r.begin(); jobnr != r.end(); ++jobnr) - { - int r = func(s, arg, jobnr, threadnr); - if (ret) - ret[jobnr] = r; - } + int r = func(s, arg, jobnr, threadnr); + if (ret) + ret[jobnr] = r; --counter; }); diff --git a/shell/casparcg.config b/shell/casparcg.config index 236741a0c..92838922f 100644 --- a/shell/casparcg.config +++ b/shell/casparcg.config @@ -10,7 +10,7 @@ true - 3 + 5 false @@ -33,13 +33,14 @@ - 1080p5000 + PAL 1 true + true - + -- 2.39.2