]> git.sesse.net Git - casparcg/commitdiff
2.0.1: ffmpeg: Replaced TBB implementation with better Concurrency Runtime based...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 17 Oct 2011 23:40:55 +0000 (23:40 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Mon, 17 Oct 2011 23:40:55 +0000 (23:40 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.0.1@1360 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

30 files changed:
common/common.vcxproj
common/common.vcxproj.filters
common/concrt/bounded_buffer.h [new file with mode: 0644]
common/concrt/scoped_oversubscription_token.h [new file with mode: 0644]
common/diagnostics/graph.cpp
common/diagnostics/graph.h
core/mixer/image/image_mixer.cpp
core/mixer/image/image_mixer.h
core/mixer/mixer.cpp
core/mixer/mixer.h
core/producer/frame/frame_factory.h
modules/decklink/interop/DeckLinkAPI_h.h
modules/decklink/interop/DeckLinkAPI_i.c
modules/ffmpeg/StdAfx.h
modules/ffmpeg/producer/audio/audio_decoder.cpp
modules/ffmpeg/producer/audio/audio_decoder.h
modules/ffmpeg/producer/audio/audio_resampler.cpp
modules/ffmpeg/producer/audio/audio_resampler.h
modules/ffmpeg/producer/ffmpeg_producer.cpp
modules/ffmpeg/producer/filter/parallel_yadif.cpp
modules/ffmpeg/producer/frame_muxer.cpp
modules/ffmpeg/producer/frame_muxer.h
modules/ffmpeg/producer/input.cpp
modules/ffmpeg/producer/input.h
modules/ffmpeg/producer/util.cpp
modules/ffmpeg/producer/util.h
modules/ffmpeg/producer/video/video_decoder.cpp
modules/ffmpeg/producer/video/video_decoder.h
modules/ffmpeg/tbb_avcodec.cpp
shell/casparcg.config

index 3a7700691946bb6b4a11c9a8c09b0b66b5ac7b14..3157645d4c6fc1212502f1de680f604aa459f00d 100644 (file)
   </ItemDefinitionGroup>\r
   <ItemGroup>\r
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
+    <ClInclude Include="concrt\bounded_buffer.h" />\r
+    <ClInclude Include="concrt\scoped_oversubscription_token.h" />\r
     <ClInclude Include="concurrency\com_context.h" />\r
     <ClInclude Include="concurrency\executor.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
index 54cce3a53fc4219395c28ca04e27ecfe0f2136df..8ebdeab3c312b6091f5398dca8f2c8a78dc11151 100644 (file)
@@ -37,6 +37,9 @@
     <Filter Include="source\compiler\vs">\r
       <UniqueIdentifier>{28c25c8a-1277-4d2c-9e85-5af33f9938ea}</UniqueIdentifier>\r
     </Filter>\r
+    <Filter Include="source\concrt">\r
+      <UniqueIdentifier>{d3ba93c7-82fb-4e69-97bc-cdbf7d6feb24}</UniqueIdentifier>\r
+    </Filter>\r
   </ItemGroup>\r
   <ItemGroup>\r
     <ClCompile Include="exception\win32_exception.cpp">\r
     <ClInclude Include="utility\move_on_copy.h">\r
       <Filter>source\utility</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="concrt\bounded_buffer.h">\r
+      <Filter>source\concrt</Filter>\r
+    </ClInclude>\r
+    <ClInclude Include="concrt\scoped_oversubscription_token.h">\r
+      <Filter>source\concrt</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
diff --git a/common/concrt/bounded_buffer.h b/common/concrt/bounded_buffer.h
new file mode 100644 (file)
index 0000000..c167e33
--- /dev/null
@@ -0,0 +1,584 @@
+#pragma once\r
+\r
+#include <agents.h>\r
+\r
+namespace Concurrency\r
+{\r
+\r
+/// <summary>\r
+///    A bounded_buffer implementation. Once the capacity is reached it will save the offered message\r
+///    id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume\r
+///    any of the postponed messages. Preference is given to previously offered messages before new ones.\r
+///\r
+///    NOTE: this bounded_buffer implementation contains code that is very unique to this particular block. \r
+///          Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer\r
+///          implementation uses a critical_section, several interlocked operations, and additional calls to async_send.\r
+///          These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem.\r
+/// </summary>\r
+/// <typeparam name="_Type">\r
+///     The payload type of messages stored and propagated by the buffer.\r
+/// </typeparam>\r
+template<class _Type>\r
+class bounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
+{\r
+public:\r
+    /// <summary>\r
+    ///     Creates an bounded_buffer within the default scheduler, and places it any schedule\r
+    ///     group of the scheduler\92s choosing.\r
+    /// </summary>\r
+    bounded_buffer(const size_t capacity)\r
+        : _M_capacity(capacity), _M_currentSize(0)\r
+    {\r
+        initialize_source_and_target();\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Creates an bounded_buffer within the default scheduler, and places it any schedule\r
+    ///     group of the scheduler\92s choosing.\r
+    /// </summary>\r
+    /// <param name="_Filter">\r
+    ///     A reference to a filter function.\r
+    /// </param>\r
+    bounded_buffer(const size_t capacity, filter_method const& _Filter)\r
+        : _M_capacity(capacity), _M_currentSize(0)\r
+    {\r
+        initialize_source_and_target();\r
+        register_filter(_Filter);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
+    ///     group of the scheduler\92s choosing.\r
+    /// </summary>\r
+    /// <param name="_PScheduler">\r
+    ///     A reference to a scheduler instance.\r
+    /// </param>\r
+    bounded_buffer(const size_t capacity, Scheduler& _PScheduler)\r
+        : _M_capacity(capacity), _M_currentSize(0)\r
+    {\r
+        initialize_source_and_target(&_PScheduler);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
+    ///     group of the scheduler\92s choosing.\r
+    /// </summary>\r
+    /// <param name="_PScheduler">\r
+    ///     A reference to a scheduler instance.\r
+    /// </param>\r
+    /// <param name="_Filter">\r
+    ///     A reference to a filter function.\r
+    /// </param>\r
+    bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter) \r
+        : _M_capacity(capacity), _M_currentSize(0)\r
+    {\r
+        initialize_source_and_target(&_PScheduler);\r
+        register_filter(_Filter);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Creates an bounded_buffer within the specified schedule group.  The scheduler is implied\r
+    ///     by the schedule group.\r
+    /// </summary>\r
+    /// <param name="_PScheduleGroup">\r
+    ///     A reference to a schedule group.\r
+    /// </param>\r
+    bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup)\r
+        : _M_capacity(capacity), _M_currentSize(0)\r
+    {\r
+        initialize_source_and_target(NULL, &_PScheduleGroup);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Creates an bounded_buffer within the specified schedule group.  The scheduler is implied\r
+    ///     by the schedule group.\r
+    /// </summary>\r
+    /// <param name="_PScheduleGroup">\r
+    ///     A reference to a schedule group.\r
+    /// </param>\r
+    /// <param name="_Filter">\r
+    ///     A reference to a filter function.\r
+    /// </param>\r
+    bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
+        : _M_capacity(capacity), _M_currentSize(0)\r
+    {\r
+        initialize_source_and_target(NULL, &_PScheduleGroup);\r
+        register_filter(_Filter);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Cleans up any resources that may have been used by the bounded_buffer.\r
+    /// </summary>\r
+    ~bounded_buffer()\r
+    {\r
+        // Remove all links\r
+        remove_network_links();\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Add an item to the bounded_buffer.\r
+    /// </summary>\r
+    /// <param name="_Item">\r
+    ///     A reference to the item to add.\r
+    /// </param>\r
+    /// <returns>\r
+    ///     A boolean indicating whether the data was accepted.\r
+    /// </returns>\r
+    bool enqueue(_Type const& _Item)\r
+    {\r
+        return Concurrency::send<_Type>(this, _Item);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Remove an item from the bounded_buffer.\r
+    /// </summary>\r
+    /// <returns>\r
+    ///     The message payload.\r
+    /// </returns>\r
+    _Type dequeue()\r
+    {\r
+        return receive<_Type>(this);\r
+    }\r
+\r
+protected:\r
+\r
+    /// <summary>\r
+    ///     The main propagate() function for ITarget blocks.  Called by a source\r
+    ///     block, generally within an asynchronous task to send messages to its targets.\r
+    /// </summary>\r
+    /// <param name="_PMessage">\r
+    ///     A pointer to the message.\r
+    /// </param>\r
+    /// <param name="_PSource">\r
+    ///     A pointer to the source block offering the message.\r
+    /// </param>\r
+    /// <returns>\r
+    ///     An indication of what the target decided to do with the message.\r
+    /// </returns>\r
+    /// <remarks>\r
+    ///     It is important that calls to propagate do *not* take the same lock on the\r
+    ///     internal structure that is used by Consume and the LWT.  Doing so could\r
+    ///     result in a deadlock with the Consume call.  (in the case of the bounded_buffer,\r
+    ///     this lock is the m_internalLock)\r
+    /// </remarks>\r
+    virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+    {\r
+        message_status _Result = accepted;\r
+            \r
+        // Check current capacity. \r
+        if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+        {\r
+            // Postpone the message, buffer is full.\r
+            _InterlockedDecrement(&_M_currentSize);\r
+            _Result = postponed;\r
+\r
+            // Save off the message id from this source to later try\r
+            // and reserve/consume when more space is free.\r
+            {\r
+                critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+                _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
+            }\r
+\r
+            async_send(NULL);\r
+        }\r
+        else\r
+        {\r
+            //\r
+            // Accept the message being propagated\r
+            // Note: depending on the source block propagating the message\r
+            // this may not necessarily be the same message (pMessage) first\r
+            // passed into the function.\r
+            //\r
+            _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+            if (_PMessage != NULL)\r
+            {\r
+                async_send(_PMessage);\r
+            }\r
+            else\r
+            {\r
+                // Didn't get a message so need to decrement.\r
+                _InterlockedDecrement(&_M_currentSize);\r
+                _Result = missed;\r
+                async_send(NULL);\r
+            }\r
+        }\r
+\r
+        return _Result;\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Synchronously sends a message to this block.  When this function completes the message will\r
+    ///     already have propagated into the block.\r
+    /// </summary>\r
+    /// <param name="_PMessage">\r
+    ///     A pointer to the message.\r
+    /// </param>\r
+    /// <param name="_PSource">\r
+    ///     A pointer to the source block offering the message.\r
+    /// </param>\r
+    /// <returns>\r
+    ///     An indication of what the target decided to do with the message.\r
+    /// </returns>\r
+    virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+    {\r
+        message_status _Result = accepted;\r
+            \r
+        // Check current capacity. \r
+        if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+        {\r
+            // Postpone the message, buffer is full.\r
+            _InterlockedDecrement(&_M_currentSize);\r
+            _Result = postponed;\r
+\r
+            // Save off the message id from this source to later try\r
+            // and reserve/consume when more space is free.\r
+            {\r
+                critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+                _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
+            }\r
+\r
+            async_send(NULL);\r
+        }\r
+        else\r
+        {\r
+            //\r
+            // Accept the message being propagated\r
+            // Note: depending on the source block propagating the message\r
+            // this may not necessarily be the same message (pMessage) first\r
+            // passed into the function.\r
+            //\r
+            _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+            if (_PMessage != NULL)\r
+            {\r
+                async_send(_PMessage);\r
+            }\r
+            else\r
+            {\r
+                // Didn't get a message so need to decrement.\r
+                _InterlockedDecrement(&_M_currentSize);\r
+                _Result = missed;\r
+                async_send(NULL);\r
+            }\r
+        }\r
+\r
+        return _Result;\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Accepts an offered message by the source, transferring ownership to the caller.\r
+    /// </summary>\r
+    /// <param name="_MsgId">\r
+    ///     The runtime object identity of the message.\r
+    /// </param>\r
+    /// <returns>\r
+    ///     A pointer to the message that the caller now has ownership of.\r
+    /// </returns>\r
+    virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
+    {\r
+        //\r
+        // Peek at the head message in the message buffer.  If the Ids match\r
+        // dequeue and transfer ownership\r
+        //\r
+        message<_Type> * _Msg = NULL;\r
+\r
+        if (_M_messageBuffer._Is_head(_MsgId))\r
+        {\r
+                       _Msg = _M_messageBuffer._Dequeue();\r
+\r
+            // Give preference to any previously postponed messages\r
+            // before decrementing current size.\r
+            if(!try_consume_msg())\r
+            {\r
+                _InterlockedDecrement(&_M_currentSize);\r
+            }\r
+        }\r
+\r
+        return _Msg;\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Try to reserve and consume a message from list of saved message ids.\r
+    /// </summary>\r
+    /// <returns>\r
+    ///     True if a message was sucessfully consumed, false otherwise.\r
+    /// </returns>\r
+    bool try_consume_msg()\r
+    {\r
+        runtime_object_identity _ReservedId = -1;\r
+        ISource<_Type> * _PSource = NULL;\r
+\r
+        // Walk through source links seeing if any saved ids exist.\r
+        bool _ConsumedMsg = true;\r
+        while(_ConsumedMsg)\r
+        {\r
+            source_iterator _Iter = _M_connectedSources.begin();\r
+            {\r
+                critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+                for (; *_Iter != NULL; ++_Iter)\r
+                {\r
+                    _PSource = *_Iter;\r
+                    std::map<ISource<_Type> *, runtime_object_identity>::iterator _MapIter;\r
+                    if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end())\r
+                    {\r
+                        _ReservedId = _MapIter->second;\r
+                        _M_savedSourceMsgIds.erase(_MapIter);\r
+                        break;\r
+                    }\r
+                }\r
+            }\r
+\r
+            // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster.\r
+            if(_ReservedId != -1)\r
+            {\r
+                if(_PSource->reserve(_ReservedId, this))\r
+                {\r
+                    message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this);\r
+                    async_send(_ConsumedMsg);\r
+                    return true;\r
+                }\r
+                // Reserve failed go or link was removed, \r
+                // go back and try and find a different msg id.\r
+                else\r
+                {\r
+                    continue;\r
+                }\r
+            }\r
+\r
+            // If this point is reached the map of source ids was empty.\r
+            break;\r
+        }\r
+\r
+        return false;\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Reserves a message previously offered by the source.\r
+    /// </summary>\r
+    /// <param name="_MsgId">\r
+    ///     The runtime object identity of the message.\r
+    /// </param>\r
+    /// <returns>\r
+    ///     A Boolean indicating whether the reservation worked or not.\r
+    /// </returns>\r
+    /// <remarks>\r
+    ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
+    /// </remarks>\r
+    virtual bool reserve_message(runtime_object_identity _MsgId)\r
+    {\r
+        // Allow reservation if this is the head message\r
+        return _M_messageBuffer._Is_head(_MsgId);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Consumes a message that was reserved previously.\r
+    /// </summary>\r
+    /// <param name="_MsgId">\r
+    ///     The runtime object identity of the message.\r
+    /// </param>\r
+    /// <returns>\r
+    ///     A pointer to the message that the caller now has ownership of.\r
+    /// </returns>\r
+    /// <remarks>\r
+    ///     Similar to 'accept', but is always preceded by a call to 'reserve'.\r
+    /// </remarks>\r
+    virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
+    {\r
+        // By default, accept the message\r
+        return accept_message(_MsgId);\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Releases a previous message reservation.\r
+    /// </summary>\r
+    /// <param name="_MsgId">\r
+    ///     The runtime object identity of the message.\r
+    /// </param>\r
+    virtual void release_message(runtime_object_identity _MsgId)\r
+    {\r
+        // The head message is the one reserved.\r
+        if (!_M_messageBuffer._Is_head(_MsgId))\r
+        {\r
+            throw message_not_found();\r
+        }\r
+    }\r
+\r
+    /// <summary>\r
+    ///    Resumes propagation after a reservation has been released\r
+    /// </summary>\r
+    virtual void resume_propagation()\r
+    {\r
+        // If there are any messages in the buffer, propagate them out\r
+        if (_M_messageBuffer._Count() > 0)\r
+        {\r
+            async_send(NULL);\r
+        }\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Notification that a target was linked to this source.\r
+    /// </summary>\r
+    /// <param name="_PTarget">\r
+    ///     A pointer to the newly linked target.\r
+    /// </param>\r
+    virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
+    {\r
+        // If the message queue is blocked due to reservation\r
+        // there is no need to do any message propagation\r
+        if (_M_pReservedFor != NULL)\r
+        {\r
+            return;\r
+        }\r
+\r
+        message<_Type> * _Msg = _M_messageBuffer._Peek();\r
+\r
+        if (_Msg != NULL)\r
+        {\r
+            // Propagate the head message to the new target\r
+            message_status _Status = _PTarget->propagate(_Msg, this);\r
+\r
+            if (_Status == accepted)\r
+            {\r
+                // The target accepted the message, restart propagation.\r
+                propagate_to_any_targets(NULL);\r
+            }\r
+\r
+            // If the status is anything other than accepted, then leave\r
+            // the message queue blocked.\r
+        }\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Takes the message and propagates it to all the targets of this bounded_buffer.\r
+    ///     This is called from async_send.\r
+    /// </summary>\r
+    /// <param name="_PMessage">\r
+    ///     A pointer to a new message.\r
+    /// </param>\r
+    virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
+    {\r
+        // Enqueue pMessage to the internal message buffer if it is non-NULL.\r
+        // pMessage can be NULL if this LWT was the result of a Repropagate call\r
+        // out of a Consume or Release (where no new message is queued up, but\r
+        // everything remaining in the bounded buffer needs to be propagated out)\r
+        if (_PMessage != NULL)\r
+        {\r
+            _M_messageBuffer._Enqueue(_PMessage);\r
+\r
+            // If the incoming pMessage is not the head message, we can safely assume that\r
+            // the head message is blocked and waiting on Consume(), Release() or a new\r
+            // link_target() and cannot be propagated out.\r
+                       if (_M_messageBuffer._Is_head(_PMessage->msg_id()))\r
+            {\r
+                _Propagate_priority_order();\r
+            }\r
+        }\r
+        else\r
+        {\r
+            // While current size is less than capacity try to consume\r
+            // any previously offered ids.\r
+            bool _ConsumedMsg = true;\r
+            while(_ConsumedMsg)\r
+            {\r
+                // Assume a message will be found to successfully consume in the\r
+                // saved ids, if not this will be decremented afterwards.\r
+                if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+                {\r
+                    break;\r
+                }\r
+\r
+                _ConsumedMsg = try_consume_msg();\r
+            }\r
+\r
+            // Decrement the current size, we broke out of the previous loop\r
+            // because we reached capacity or there were no more messages to consume.\r
+            _InterlockedDecrement(&_M_currentSize);\r
+        }\r
+    }\r
+\r
+private:\r
+\r
+    /// <summary>\r
+    ///     Attempts to propagate out any messages currently in the block.\r
+    /// </summary>\r
+    void _Propagate_priority_order()\r
+    {\r
+        message<_Target_type> * _Msg = _M_messageBuffer._Peek();\r
+\r
+        // If someone has reserved the _Head message, don't propagate anymore\r
+        if (_M_pReservedFor != NULL)\r
+        {\r
+            return;\r
+        }\r
+\r
+        while (_Msg != NULL)\r
+        {\r
+            message_status _Status = declined;\r
+\r
+            // Always start from the first target that linked.\r
+            for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
+            {\r
+                ITarget<_Target_type> * _PTarget = *_Iter;\r
+                _Status = _PTarget->propagate(_Msg, this);\r
+\r
+                // Ownership of message changed. Do not propagate this\r
+                // message to any other target.\r
+                if (_Status == accepted)\r
+                {\r
+                    break;\r
+                }\r
+\r
+                // If the target just propagated to reserved this message, stop\r
+                // propagating it to others.\r
+                if (_M_pReservedFor != NULL)\r
+                {\r
+                    break;\r
+                }\r
+            }\r
+\r
+            // If status is anything other than accepted, then the head message\r
+            // was not propagated out.  Thus, nothing after it in the queue can\r
+            // be propagated out.  Cease propagation.\r
+            if (_Status != accepted)\r
+            {\r
+                break;\r
+            }\r
+\r
+            // Get the next message\r
+            _Msg = _M_messageBuffer._Peek();\r
+        }\r
+    }\r
+\r
+    /// <summary>\r
+    ///     Message buffer used to store messages.\r
+    /// </summary>\r
+    ::Concurrency::details::_Queue<message<_Type>> _M_messageBuffer;\r
+\r
+    /// <summary>\r
+    ///     Maximum number of messages bounded_buffer can hold.\r
+    /// </summary>\r
+    const size_t _M_capacity;\r
+\r
+    /// <summary>\r
+    ///     Current number of messages in bounded_buffer.\r
+    /// </summary>\r
+    volatile long _M_currentSize;\r
+\r
+    /// <summary>\r
+    ///     Lock used to guard saved message ids map.\r
+    /// </summary>\r
+    critical_section _M_savedIdsLock;\r
+\r
+    /// <summary>\r
+    ///     Map of source links to saved message ids.\r
+    /// </summary>\r
+    std::map<ISource<_Type> *, runtime_object_identity> _M_savedSourceMsgIds;\r
+\r
+    //\r
+    // Hide assignment operator and copy constructor\r
+    //\r
+    bounded_buffer const &operator =(bounded_buffer const&);  // no assignment operator\r
+    bounded_buffer(bounded_buffer const &);                   // no copy constructor\r
+};\r
+}
\ No newline at end of file
diff --git a/common/concrt/scoped_oversubscription_token.h b/common/concrt/scoped_oversubscription_token.h
new file mode 100644 (file)
index 0000000..71d6e35
--- /dev/null
@@ -0,0 +1,24 @@
+#pragma once\r
+\r
+#include <agents.h>\r
+\r
+namespace Concurrency {\r
+\r
+/// <summary>\r
+/// An RAII style wrapper around Concurrency::Context::Oversubscribe,\r
+/// useful for annotating known blocking calls\r
+/// </summary>\r
+class scoped_oversubcription_token\r
+{\r
+public:\r
+    scoped_oversubcription_token()\r
+    {\r
+        Concurrency::Context::CurrentContext()->Oversubscribe(true);\r
+    }\r
+    ~scoped_oversubcription_token()\r
+    {\r
+        Concurrency::Context::CurrentContext()->Oversubscribe(false);\r
+    }\r
+};\r
+\r
+}
\ No newline at end of file
index 5378f515642683b7f30d24d2ef7dcb515cbc18f7..213610aed37dee5a00ceda169927a2e08cdcacf4 100644 (file)
@@ -269,7 +269,7 @@ struct graph::implementation : public drawable
 \r
        implementation(const printer& parent_printer) \r
                : parent_printer_(parent_printer)\r
-               , name_(parent_printer_ ? narrow(parent_printer_()) : "")\r
+               , name_("")\r
                , counter_(0){}\r
 \r
        void update(const std::string& name, double value)\r
@@ -356,14 +356,20 @@ private:
        implementation& operator=(implementation&);\r
 };\r
        \r
-graph::graph(const std::string& name) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(name) : nullptr)\r
+graph::graph(const std::string& name, bool start) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(name) : nullptr)\r
 {\r
-       if(impl_)\r
-               context::register_drawable(impl_);\r
+       if(start)\r
+               graph::start();\r
 }\r
 \r
-graph::graph(const printer& parent_printer) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(parent_printer) : nullptr)\r
+graph::graph(const printer& parent_printer, bool start) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(parent_printer) : nullptr)\r
 {\r
+       if(start)\r
+               graph::start();\r
+}\r
+\r
+void graph::start()\r
+{      \r
        if(impl_)\r
                context::register_drawable(impl_);\r
 }\r
@@ -424,13 +430,13 @@ void graph::add_guide(const std::string& name, double value)
        }\r
 }\r
 \r
-safe_ptr<graph> create_graph(const std::string& name)\r
+safe_ptr<graph> create_graph(const std::string& name, bool start)\r
 {\r
-       return safe_ptr<graph>(new graph(name));\r
+       return safe_ptr<graph>(new graph(name, start));\r
 }\r
-safe_ptr<graph> create_graph(const printer& parent_printer)\r
+safe_ptr<graph> create_graph(const printer& parent_printer, bool start)\r
 {\r
-       return safe_ptr<graph>(new graph(parent_printer));\r
+       return safe_ptr<graph>(new graph(parent_printer, start));\r
 }\r
 \r
 \r
index 3f54c0a7c1b6f3ad9c18b9d84bade6b93192d4e1..dbba43fce21a00b65e4e46cf43092d8c18b1eec4 100644 (file)
@@ -51,11 +51,12 @@ struct color
 \r
 class graph\r
 {\r
-       friend safe_ptr<graph> create_graph(const std::string& name);\r
-       friend safe_ptr<graph> create_graph(const printer& parent_printer);\r
-       graph(const std::string& name);\r
-       graph(const printer& parent_printer);\r
+       friend safe_ptr<graph> create_graph(const std::string& name, bool start);\r
+       friend safe_ptr<graph> create_graph(const printer& parent_printer, bool start);\r
+       graph(const std::string& name, bool start = true);\r
+       graph(const printer& parent_printer, bool start = true);\r
 public:\r
+       void start();\r
        void update_value(const std::string& name, double value);\r
        void set_value(const std::string& name, double value);\r
        void set_color(const std::string& name, color c);\r
@@ -66,8 +67,8 @@ private:
        std::shared_ptr<implementation> impl_;\r
 };\r
 \r
-safe_ptr<graph> create_graph(const std::string& name);\r
-safe_ptr<graph> create_graph(const printer& parent_printer);\r
+safe_ptr<graph> create_graph(const std::string& name, bool start = true);\r
+safe_ptr<graph> create_graph(const printer& parent_printer, bool start = true);\r
        \r
 //namespace v2\r
 //{\r
index a93bcd8dec2a73592496a69436f050ccb232aa98..03280490f21a8676f2f3fe55b5df903afef8cea9 100644 (file)
@@ -285,6 +285,11 @@ public:
        {\r
                return make_safe<write_frame>(ogl_, tag, desc);\r
        }\r
+\r
+       boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* tag, const pixel_format_desc& desc)\r
+       {\r
+               return ogl_.begin_invoke([=]{return make_safe<write_frame>(ogl_, tag, desc);}, high_priority);\r
+       }\r
 };\r
 \r
 image_mixer::image_mixer(ogl_device& ogl, const video_format_desc& format_desc) : impl_(new implementation(ogl, format_desc)){}\r
@@ -293,6 +298,7 @@ void image_mixer::visit(write_frame& frame){impl_->visit(frame);}
 void image_mixer::end(){impl_->end();}\r
 boost::unique_future<safe_ptr<host_buffer>> image_mixer::render(){return impl_->render();}\r
 safe_ptr<write_frame> image_mixer::create_frame(const void* tag, const pixel_format_desc& desc){return impl_->create_frame(tag, desc);}\r
+boost::unique_future<safe_ptr<write_frame>> image_mixer::create_frame2(const void* tag, const pixel_format_desc& desc){return impl_->create_frame2(tag, desc);}\r
 void image_mixer::begin_layer(blend_mode::type blend_mode){impl_->begin_layer(blend_mode);}\r
 void image_mixer::end_layer(){impl_->end_layer();}\r
 image_mixer& image_mixer::operator=(image_mixer&& other)\r
index 266c3d43b8f923fc376561663bc3c203916fc9b4..436be02499238d64a06e2c0538c02fc5c5997586 100644 (file)
@@ -26,7 +26,6 @@
 #include <core/producer/frame/frame_visitor.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
-\r
 #include <boost/thread/future.hpp>\r
 \r
 namespace caspar { namespace core {\r
@@ -54,6 +53,7 @@ public:
        boost::unique_future<safe_ptr<host_buffer>> render();\r
 \r
        safe_ptr<write_frame> create_frame(const void* tag, const pixel_format_desc& format);\r
+       boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* tag, const pixel_format_desc& format);\r
 \r
 private:\r
        struct implementation;\r
index 8353d4170769bd351b345e049b7a00899be812c4..1f566e77db634b6045ee976791d9954bcb2f8adc 100644 (file)
@@ -159,6 +159,11 @@ public:
        {               \r
                return image_mixer_.create_frame(tag, desc);\r
        }\r
+\r
+       boost::unique_future<safe_ptr<core::write_frame>> create_frame2(const void* tag, const core::pixel_format_desc& desc)\r
+       {               \r
+               return image_mixer_.create_frame2(tag, desc);\r
+       }\r
                \r
        void set_transform(int index, const frame_transform& transform, unsigned int mix_duration, const std::wstring& tween)\r
        {\r
@@ -207,14 +212,7 @@ mixer::mixer(video_channel_context& video_channel) : impl_(new implementation(vi
 safe_ptr<core::read_frame> mixer::execute(const std::map<int, safe_ptr<core::basic_frame>>& frames){ return impl_->execute(frames);}\r
 core::video_format_desc mixer::get_video_format_desc() const { return impl_->channel_.get_format_desc(); }\r
 safe_ptr<core::write_frame> mixer::create_frame(const void* tag, const core::pixel_format_desc& desc){ return impl_->create_frame(tag, desc); }                \r
-safe_ptr<core::write_frame> mixer::create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt)\r
-{\r
-       // Create bgra frame\r
-       core::pixel_format_desc desc;\r
-       desc.pix_fmt = pix_fmt;\r
-       desc.planes.push_back( core::pixel_format_desc::plane(width, height, 4));\r
-       return create_frame(tag, desc);\r
-}\r
+boost::unique_future<safe_ptr<write_frame>> mixer::create_frame2(const void* video_stream_tag, const pixel_format_desc& desc){ return impl_->create_frame2(video_stream_tag, desc); }                  \r
 void mixer::set_frame_transform(int index, const core::frame_transform& transform, unsigned int mix_duration, const std::wstring& tween){impl_->set_transform(index, transform, mix_duration, tween);}\r
 void mixer::apply_frame_transform(int index, const std::function<core::frame_transform(core::frame_transform)>& transform, unsigned int mix_duration, const std::wstring& tween){impl_->apply_transform(index, transform, mix_duration, tween);}\r
 void mixer::clear_transforms(){impl_->clear_transforms();}\r
index f7bd73efe624bcf8723136c6a66d96aefd2070c7..a2948c02519b22780d462f9fbe98dcf3a24827c7 100644 (file)
@@ -49,9 +49,9 @@ public:
                \r
        safe_ptr<core::read_frame> execute(const std::map<int, safe_ptr<core::basic_frame>>& frames); // nothrow\r
                \r
-       safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc);         \r
-       safe_ptr<core::write_frame> create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt = core::pixel_format::bgra);            \r
-       \r
+       safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc);                 \r
+       boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc);\r
+\r
        core::video_format_desc get_video_format_desc() const; // nothrow\r
 \r
 \r
index c8880155bec68e11e00f843754c440a9f4ee8bb4..efaa38b5402b1db05c8e5630302d7b6cbb32dbc0 100644 (file)
 #pragma once\r
 \r
 #include "pixel_format.h"\r
+#include "../../video_format.h"\r
 \r
 #include <common/memory/safe_ptr.h>\r
+#include <common/concrt/scoped_oversubscription_token.h>\r
+\r
+#include <concrt.h>\r
 \r
 #include <boost/noncopyable.hpp>\r
+#include <boost/thread/future.hpp>\r
 \r
 namespace caspar { namespace core {\r
        \r
 class write_frame;\r
-struct pixel_format_desc;\r
 struct video_format_desc;\r
                \r
 struct frame_factory : boost::noncopyable\r
 {\r
        virtual safe_ptr<write_frame> create_frame(const void* video_stream_tag, const pixel_format_desc& desc) = 0;\r
-       virtual safe_ptr<write_frame> create_frame(const void* video_stream_tag, size_t width, size_t height, pixel_format::type pix_fmt = pixel_format::bgra) = 0;             \r
+       virtual safe_ptr<write_frame> create_frame(const void* video_stream_tag, size_t width, size_t height, pixel_format::type pix_fmt = pixel_format::bgra)\r
+       {       \r
+               // Create bgra frame\r
+               core::pixel_format_desc desc;\r
+               desc.pix_fmt = pix_fmt;\r
+               desc.planes.push_back( core::pixel_format_desc::plane(width, height, 4));\r
+               return create_frame(video_stream_tag, desc);\r
+       }\r
        \r
+       virtual boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc) = 0;\r
+\r
        virtual video_format_desc get_video_format_desc() const = 0; // nothrow\r
 };\r
+       \r
+class concrt_frame_factory : public frame_factory\r
+{      \r
+       safe_ptr<frame_factory> factory_;\r
+public:\r
+       concrt_frame_factory(const safe_ptr<frame_factory>& factory) \r
+               : factory_(factory)\r
+       {\r
+       }\r
+               \r
+       virtual safe_ptr<write_frame> create_frame(const void* tag, const pixel_format_desc& desc)\r
+       {\r
+               auto frame = factory_->create_frame2(tag, desc);\r
+\r
+               Concurrency::wait(0);\r
+\r
+               if(!frame.has_value())\r
+               {\r
+                       Concurrency::scoped_oversubcription_token oversubscribe;\r
+                       frame.wait();\r
+               }\r
+               return frame.get();\r
+       }\r
+\r
+       virtual boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc)\r
+       {\r
+               return factory_->create_frame2(video_stream_tag, desc);\r
+       }\r
+\r
+       virtual video_format_desc get_video_format_desc() const\r
+       {\r
+               return factory_->get_video_format_desc();\r
+       }\r
+};\r
 \r
 }}
\ No newline at end of file
index 4a466545a801cf263d739eaa12fe7983ee267fb7..11230ecccdaefd5340f5fabc832529cddb2e6d81 100644 (file)
@@ -4,7 +4,7 @@
 \r
 \r
  /* File created by MIDL compiler version 7.00.0555 */\r
-/* at Wed Sep 21 22:02:53 2011\r
+/* at Tue Oct 18 00:55:44 2011\r
  */\r
 /* Compiler settings for interop\DeckLinkAPI.idl:\r
     Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
index a31a6cd25bed4537f68ba0e6b70501701e335642..1b61c2c29073822ca291445cd5bb44799926c259 100644 (file)
@@ -6,7 +6,7 @@
 \r
 \r
  /* File created by MIDL compiler version 7.00.0555 */\r
-/* at Wed Sep 21 22:02:53 2011\r
+/* at Tue Oct 18 00:55:44 2011\r
  */\r
 /* Compiler settings for interop\DeckLinkAPI.idl:\r
     Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
index fb398771232803c06fbc1e4c1009b53fec784dbd..9dbd5c4f484c0a1b6e799598d8e11d738c22a45c 100644 (file)
@@ -41,9 +41,8 @@
 #include <string>\r
 #include <math.h>\r
 \r
-#include <tbb/atomic.h>\r
-#include <tbb/concurrent_queue.h>\r
-#include <tbb/parallel_for.h>\r
+#include <agents.h>\r
+#include <ppl.h>\r
 \r
 #include <boost/assign.hpp>\r
 #include <boost/filesystem.hpp>\r
index 3f5fe7f7269e8fa77ea1186d67ea16669c7aded0..a8bb817b8363618e6474753ebbf3a4215cbd9746 100644 (file)
 #include "../../stdafx.h"\r
 \r
 #include "audio_decoder.h"\r
-\r
 #include "audio_resampler.h"\r
 \r
+#include "../util.h"\r
 #include "../../ffmpeg_error.h"\r
 \r
 #include <core/video_format.h>\r
 \r
 #include <tbb/cache_aligned_allocator.h>\r
 \r
-#include <queue>\r
-\r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
 #pragma warning (disable : 4244)\r
@@ -46,21 +44,30 @@ extern "C"
 \r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct audio_decoder::implementation : boost::noncopyable\r
-{      \r
+struct audio_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
+{\r
+       audio_decoder::token_t&  active_token_;\r
+       audio_decoder::source_t& source_;\r
+       audio_decoder::target_t& target_;\r
+\r
        std::shared_ptr<AVCodecContext>                                                         codec_context_;         \r
        const core::video_format_desc                                                           format_desc_;\r
        int                                                                                                                     index_;\r
        std::unique_ptr<audio_resampler>                                                        resampler_;\r
 \r
        std::vector<int8_t,  tbb::cache_aligned_allocator<int8_t>>      buffer1_;\r
-\r
-       std::queue<std::shared_ptr<AVPacket>>                                           packets_;\r
-\r
+       \r
        int64_t                                                                                                         nb_frames_;\r
 public:\r
-       explicit implementation(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) \r
-               : format_desc_(format_desc)     \r
+       explicit implementation(audio_decoder::token_t& active_token,\r
+                                                       audio_decoder::source_t& source,\r
+                                                       audio_decoder::target_t& target,\r
+                                                       const safe_ptr<AVFormatContext>& context, \r
+                                                       const core::video_format_desc& format_desc) \r
+               : active_token_(active_token)\r
+               , source_(source)\r
+               , target_(target)\r
+               , format_desc_(format_desc)     \r
                , nb_frames_(0)\r
        {                               \r
                try\r
@@ -85,55 +92,51 @@ public:
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                        CASPAR_LOG(warning) << "[audio_decoder] Failed to open audio-stream. Running without audio.";                   \r
                }\r
-       }\r
 \r
-       void push(const std::shared_ptr<AVPacket>& packet)\r
-       {                       \r
-               if(packet && packet->stream_index != index_)\r
-                       return;\r
+               start();\r
+       }\r
 \r
-               packets_.push(packet);\r
-       }       \r
-       \r
-       std::vector<std::shared_ptr<core::audio_buffer>> poll()\r
+       ~implementation()\r
        {\r
-               std::vector<std::shared_ptr<core::audio_buffer>> result;\r
-\r
-               if(packets_.empty())\r
-                       return result;\r
-\r
-               if(!codec_context_)\r
-                       return empty_poll();\r
-               \r
-               auto packet = packets_.front();\r
+               agent::wait(this);\r
+       }       \r
 \r
-               if(packet)              \r
+       virtual void run()\r
+       {\r
+               try\r
                {\r
-                       result.push_back(decode(*packet));\r
-                       if(packet->size == 0)                                   \r
-                               packets_.pop();\r
+                       while(Concurrency::receive(active_token_))\r
+                       {\r
+                               auto packet = Concurrency::receive(source_);\r
+                               if(packet == eof_packet())\r
+                               {\r
+                                       Concurrency::send(target_, eof_audio());\r
+                                       break;\r
+                               }\r
+\r
+                               if(packet == loop_packet())     \r
+                               {       \r
+                                       if(codec_context_)\r
+                                               avcodec_flush_buffers(codec_context_.get());\r
+                                       Concurrency::send(target_, loop_audio());\r
+                               }       \r
+                               else if(!codec_context_)\r
+                                       Concurrency::send(target_, empty_audio());                                      \r
+                               else            \r
+                                       Concurrency::send(target_, decode(*packet));            \r
+                       }\r
+               }\r
+               catch(...)\r
+               {\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
-               else                    \r
-               {       \r
-                       avcodec_flush_buffers(codec_context_.get());\r
-                       result.push_back(nullptr);\r
-                       packets_.pop();\r
-               }               \r
-\r
-               return result;\r
-       }\r
 \r
-       std::vector<std::shared_ptr<core::audio_buffer>> empty_poll()\r
-       {\r
-               auto packet = packets_.front();\r
-               packets_.pop();\r
+               std::shared_ptr<AVPacket> packet;\r
+               Concurrency::try_receive(source_, packet);                                              \r
 \r
-               if(!packet)                     \r
-                       return boost::assign::list_of(nullptr);\r
-               \r
-               return boost::assign::list_of(std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0));   \r
+               done();\r
        }\r
-\r
+       \r
        std::shared_ptr<core::audio_buffer> decode(AVPacket& pkt)\r
        {               \r
                buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
@@ -154,17 +157,16 @@ public:
 \r
                return std::make_shared<core::audio_buffer>(samples, samples + n_samples);\r
        }\r
-\r
-       bool ready() const\r
-       {\r
-               return !packets_.empty();\r
-       }\r
 };\r
 \r
-audio_decoder::audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) : impl_(new implementation(context, format_desc)){}\r
-void audio_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
-bool audio_decoder::ready() const{return impl_->ready();}\r
-std::vector<std::shared_ptr<core::audio_buffer>> audio_decoder::poll(){return impl_->poll();}\r
+audio_decoder::audio_decoder(token_t& active_token,\r
+                                                        source_t& source,\r
+                                                        target_t& target,\r
+                                                        const safe_ptr<AVFormatContext>& context, \r
+                                                        const core::video_format_desc& format_desc)\r
+       : impl_(new implementation(active_token, source, target, context, format_desc))\r
+{\r
+}\r
 int64_t audio_decoder::nb_frames() const{return impl_->nb_frames_;}\r
 \r
 }}
\ No newline at end of file
index e446bd20f882e84e60c3b9fb00f7c9aea342eddf..b70a307b30d519875ed8c4620cd3221d144de64a 100644 (file)
@@ -25,6 +25,7 @@
 \r
 #include <boost/noncopyable.hpp>\r
 \r
+#include <agents.h>\r
 #include <vector>\r
 \r
 struct AVPacket;\r
@@ -43,15 +44,21 @@ namespace ffmpeg {
 class audio_decoder : boost::noncopyable\r
 {\r
 public:\r
-       explicit audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
-       \r
-       void push(const std::shared_ptr<AVPacket>& packet);\r
-       bool ready() const;\r
-       std::vector<std::shared_ptr<core::audio_buffer>> poll();\r
 \r
+       typedef Concurrency::ISource<bool>                                                                      token_t;\r
+       typedef Concurrency::ISource<std::shared_ptr<AVPacket>>                         source_t;\r
+       typedef Concurrency::ITarget<std::shared_ptr<core::audio_buffer>>       target_t;\r
+\r
+       explicit audio_decoder(token_t& active_token,\r
+                                                  source_t& source,\r
+                                                  target_t& target,\r
+                                                  const safe_ptr<AVFormatContext>& context, \r
+                                                  const core::video_format_desc& format_desc);\r
+       \r
        int64_t nb_frames() const;\r
 \r
 private:\r
+       \r
        struct implementation;\r
        safe_ptr<implementation> impl_;\r
 };\r
index 179d09cc589e92408df493f59270451deb75104a..9f953f130b7a274a8ed5e645092b1684df6b6146 100644 (file)
@@ -62,7 +62,7 @@ struct audio_resampler::implementation
 \r
        std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> resample(std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>>&& data)\r
        {\r
-               if(resampler_)\r
+               if(resampler_ && !data.empty())\r
                {\r
                        buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
                        auto ret = audio_resample(resampler_.get(),\r
index e25d06bf5d99caa1570b6c87fe803e9ac57b51f8..c03491cbe237c878bbb75630655de367b25e3633 100644 (file)
@@ -4,6 +4,8 @@
 \r
 #include <libavutil/samplefmt.h>\r
 \r
+#include <tbb/cache_aligned_allocator.h>\r
+\r
 namespace caspar { namespace ffmpeg {\r
 \r
 class audio_resampler\r
index 480e95b8d0d3739eb4db855e35613adc428ca9fc..ee30e2bbcf634acd34d07f6874b367163cb5850f 100644 (file)
@@ -27,6 +27,7 @@
 #include "audio/audio_decoder.h"\r
 #include "video/video_decoder.h"\r
 \r
+#include <common/concrt/bounded_buffer.h>\r
 #include <common/env.h>\r
 #include <common/utility/assert.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <boost/range/algorithm/find_if.hpp>\r
 #include <boost/range/algorithm/find.hpp>\r
 \r
-#include <tbb/parallel_invoke.h>\r
+#include <agents.h>\r
+#include <ppl.h>\r
 \r
 namespace caspar { namespace ffmpeg {\r
+\r
+template<typename T>\r
+struct buffer_alias\r
+{\r
+       typedef Concurrency::bounded_buffer<std::shared_ptr<T>> type;\r
+};\r
                                \r
 struct ffmpeg_producer : public core::frame_producer\r
-{\r
-       const std::wstring                                                              filename_;\r
-       \r
-       const safe_ptr<diagnostics::graph>                              graph_;\r
-       boost::timer                                                                    frame_timer_;\r
-       boost::timer                                                                    video_timer_;\r
-       boost::timer                                                                    audio_timer_;\r
+{      \r
+       const std::wstring                                              filename_;\r
+       const int                                                               start_;\r
+       const bool                                                              loop_;\r
+       const size_t                                                    length_;\r
+\r
+       buffer_alias<AVPacket>::type                    video_packets_;\r
+       buffer_alias<AVPacket>::type                    audio_packets_;\r
+       buffer_alias<AVFrame>::type                             video_frames_;\r
+       buffer_alias<core::audio_buffer>::type  audio_buffers_;\r
+       buffer_alias<core::basic_frame>::type   muxed_frames_;\r
+       Concurrency::overwrite_buffer<bool>             active_token_;\r
+               \r
+       const safe_ptr<diagnostics::graph>              graph_;\r
                                        \r
-       const safe_ptr<core::frame_factory>                             frame_factory_;\r
-       const core::video_format_desc                                   format_desc_;\r
-\r
-       input                                                                                   input_; \r
-       video_decoder                                                                   video_decoder_;\r
-       audio_decoder                                                                   audio_decoder_; \r
-       double                                                                                  fps_;\r
-       frame_muxer                                                                             muxer_;\r
-\r
-       const int                                                                               start_;\r
-       const bool                                                                              loop_;\r
-       const size_t                                                                    length_;\r
+       input                                                                   input_; \r
+       video_decoder                                                   video_decoder_;\r
+       audio_decoder                                                   audio_decoder_; \r
+       frame_muxer2                                                    muxer_;\r
 \r
-       safe_ptr<core::basic_frame>                                             last_frame_;\r
-\r
-       const size_t                                                                    width_;\r
-       const size_t                                                                    height_;\r
-       bool                                                                                    is_progressive_;\r
+       safe_ptr<core::basic_frame>                             last_frame_;\r
        \r
 public:\r
        explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
                : filename_(filename)\r
-               , graph_(diagnostics::create_graph([this]{return print();}))\r
-               , frame_factory_(frame_factory)         \r
-               , format_desc_(frame_factory->get_video_format_desc())\r
-               , input_(graph_, filename_, loop, start, length)\r
-               , video_decoder_(input_.context(), frame_factory, filter)\r
-               , audio_decoder_(input_.context(), frame_factory->get_video_format_desc())\r
-               , fps_(video_decoder_.fps())\r
-               , muxer_(fps_, frame_factory)\r
                , start_(start)\r
                , loop_(loop)\r
                , length_(length)\r
+               , video_packets_(25)\r
+               , audio_packets_(25)\r
+               , video_frames_(2)\r
+               , audio_buffers_(2)\r
+               , muxed_frames_(2)\r
+               , graph_(diagnostics::create_graph([this]{return print();}, false))\r
+               , input_(active_token_, video_packets_, audio_packets_, graph_, filename_, loop, start, length)\r
+               , video_decoder_(active_token_, video_packets_, video_frames_, input_.context(), frame_factory->get_video_format_desc().fps, filter)\r
+               , audio_decoder_(active_token_, audio_packets_, audio_buffers_, input_.context(), frame_factory->get_video_format_desc())\r
+               , muxer_(active_token_, video_frames_, audio_buffers_, muxed_frames_, video_decoder_.fps(), frame_factory)\r
                , last_frame_(core::basic_frame::empty())\r
-               , width_(video_decoder_.width())\r
-               , height_(video_decoder_.height())\r
-               , is_progressive_(true)\r
        {\r
-               graph_->add_guide("frame-time", 0.5);\r
-               graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));\r
                graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f));   \r
-               \r
-               // Do some pre-work in order to not block rendering thread for initialization and allocations.\r
+               graph_->start();\r
 \r
-               push_packets();\r
-               auto video_frames = video_decoder_.poll();\r
-               if(!video_frames.empty())\r
-               {\r
-                       auto& video_frame = video_frames.front();\r
-                       auto  desc                = get_pixel_format_desc(static_cast<PixelFormat>(video_frame->format), video_frame->width, video_frame->height);\r
-                       if(desc.pix_fmt == core::pixel_format::invalid)\r
-                               get_pixel_format_desc(PIX_FMT_BGRA, video_frame->width, video_frame->height);\r
-                       \r
-                       for(int n = 0; n < 3; ++n)\r
-                               frame_factory->create_frame(this, desc);\r
-               }\r
-               BOOST_FOREACH(auto& video, video_frames)                        \r
-                       muxer_.push(video, 0);  \r
+               Concurrency::send(active_token_, true);\r
+       }\r
+\r
+       ~ffmpeg_producer()\r
+       {\r
+               Concurrency::send(active_token_, false);\r
+               std::shared_ptr<core::basic_frame> frame;\r
+               Concurrency::try_receive(muxed_frames_, frame);\r
        }\r
-                       \r
+                                       \r
        virtual safe_ptr<core::basic_frame> receive(int hints)\r
        {\r
                auto frame = core::basic_frame::late();\r
                \r
-               frame_timer_.restart();\r
-               \r
-               for(int n = 0; n < 64 && muxer_.empty(); ++n)\r
-                       decode_frame(hints);\r
-               \r
-               graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
-\r
-               if(!muxer_.empty())\r
-                       frame = last_frame_ = muxer_.pop();     \r
-               else\r
-               {\r
-                       if(input_.eof())\r
-                               return core::basic_frame::eof();\r
-                       else                    \r
-                               graph_->add_tag("underflow");   \r
+               try\r
+               {               \r
+                       frame = last_frame_ = safe_ptr<core::basic_frame>(Concurrency::receive(muxed_frames_, 8));\r
                }\r
-               \r
+               catch(Concurrency::operation_timed_out&)\r
+               {               \r
+                       graph_->add_tag("underflow");   \r
+               }\r
+\r
                return frame;\r
        }\r
 \r
@@ -145,51 +130,8 @@ public:
        {\r
                return disable_audio(last_frame_);\r
        }\r
-\r
-       void push_packets()\r
-       {\r
-               for(int n = 0; n < 16 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) \r
-               {\r
-                       std::shared_ptr<AVPacket> pkt;\r
-                       if(input_.try_pop(pkt))\r
-                       {\r
-                               video_decoder_.push(pkt);\r
-                               audio_decoder_.push(pkt);\r
-                       }\r
-               }\r
-       }\r
-\r
-       void decode_frame(int hints)\r
-       {\r
-               push_packets();\r
-               \r
-               tbb::parallel_invoke(\r
-               [&]\r
-               {\r
-                       if(muxer_.video_ready())\r
-                               return;\r
-\r
-                       auto video_frames = video_decoder_.poll();\r
-                       BOOST_FOREACH(auto& video, video_frames)        \r
-                       {\r
-                               is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_;\r
-                               muxer_.push(video, hints);      \r
-                       }\r
-               },\r
-               [&]\r
-               {\r
-                       if(muxer_.audio_ready())\r
-                               return;\r
-                                       \r
-                       auto audio_samples = audio_decoder_.poll();\r
-                       BOOST_FOREACH(auto& audio, audio_samples)\r
-                               muxer_.push(audio);                             \r
-               });\r
-\r
-               muxer_.commit();\r
-       }\r
-\r
-       virtual int64_t nb_frames() const \r
+       \r
+       virtual int64_t nb_frames() const\r
        {\r
                if(loop_)\r
                        return std::numeric_limits<int64_t>::max();\r
@@ -215,8 +157,8 @@ public:
        virtual std::wstring print() const\r
        {\r
                return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
-                                                 + boost::lexical_cast<std::wstring>(width_) + L"x" + boost::lexical_cast<std::wstring>(height_)\r
-                                                 + (is_progressive_ ? L"p" : L"i")  + boost::lexical_cast<std::wstring>(is_progressive_ ? fps_ : 2.0 * fps_)\r
+                                                 + boost::lexical_cast<std::wstring>(video_decoder_.width()) + L"x" + boost::lexical_cast<std::wstring>(video_decoder_.height())\r
+                                                 + (video_decoder_.is_progressive() ? L"p" : L"i")  + boost::lexical_cast<std::wstring>(video_decoder_.is_progressive() ? video_decoder_.fps() : 2.0 * video_decoder_.fps())\r
                                                  + L"]";\r
        }\r
 };\r
index 0f2f04f1a0d202574c8fade9ac0e59d9228c7c6c..892f17572418916c4a9c0f0c09661cff7430178d 100644 (file)
@@ -16,8 +16,8 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
-#include <tbb/parallel_for.h>\r
-#include <tbb/concurrent_queue.h>\r
+#include <ppl.h>\r
+#include <concurrent_queue.h>\r
 \r
 #include <boost/thread/once.hpp>\r
 \r
@@ -67,10 +67,9 @@ void parallel_yadif_filter_line(parallel_yadif_context& ctx, uint8_t *dst, uint8
        \r
        if(ctx.index == ctx.last_index)\r
        {               \r
-               tbb::parallel_for(tbb::blocked_range<size_t>(0, ctx.index), [=](const tbb::blocked_range<size_t>& r)\r
+               Concurrency::parallel_for(0, ctx.index, [=](size_t n)\r
                {\r
-                       for(auto n = r.begin(); n != r.end(); ++n)\r
-                               org_yadif_filter_line(ctx.args[n].dst, ctx.args[n].prev, ctx.args[n].cur, ctx.args[n].next, ctx.args[n].w, ctx.args[n].prefs, ctx.args[n].mrefs, ctx.args[n].parity, ctx.args[n].mode);\r
+                       org_yadif_filter_line(ctx.args[n].dst, ctx.args[n].prev, ctx.args[n].cur, ctx.args[n].next, ctx.args[n].w, ctx.args[n].prefs, ctx.args[n].mrefs, ctx.args[n].parity, ctx.args[n].mode);\r
                });\r
                ctx.index = 0;\r
        }\r
@@ -78,7 +77,7 @@ void parallel_yadif_filter_line(parallel_yadif_context& ctx, uint8_t *dst, uint8
 \r
 namespace caspar { namespace ffmpeg {\r
        \r
-tbb::concurrent_bounded_queue<decltype(org_yadif_filter_line)> parallel_line_func_pool;\r
+Concurrency::concurrent_queue<decltype(org_yadif_filter_line)> parallel_line_func_pool;\r
 std::array<parallel_yadif_context, 18> ctxs;\r
 \r
 #define RENAME(a) f ## a\r
index 807b86e82adf601ea96bd2678888c2ba5f408cde..c9b750a2ff9b291260753ca1dd4593148f9d8d4c 100644 (file)
@@ -35,6 +35,9 @@ extern "C"
 #include <boost/foreach.hpp>\r
 #include <boost/range/algorithm_ext/push_back.hpp>\r
 \r
+#include <agents.h>\r
+#include <ppl.h>\r
+\r
 #include <deque>\r
 #include <queue>\r
 #include <vector>\r
@@ -419,4 +422,365 @@ bool frame_muxer::video_ready() const{return impl_->video_ready();}
 bool frame_muxer::audio_ready() const{return impl_->audio_ready();}\r
 int64_t frame_muxer::calc_nb_frames(int64_t nb_frames) const {return impl_->calc_nb_frames(nb_frames);}\r
 \r
+\r
+struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
+{      \r
+       frame_muxer2::token_t&                  active_token_;\r
+       frame_muxer2::video_source_t&   video_source_;\r
+       frame_muxer2::audio_source_t&   audio_source_;\r
+       frame_muxer2::target_t&                 target_;\r
+\r
+       std::deque<std::queue<safe_ptr<write_frame>>>   video_streams_;\r
+       std::deque<core::audio_buffer>                                  audio_streams_;\r
+       std::deque<safe_ptr<basic_frame>>                               frame_buffer_;\r
+       display_mode::type                                                              display_mode_;\r
+       const double                                                                    in_fps_;\r
+       const video_format_desc                                                 format_desc_;\r
+       bool                                                                                    auto_transcode_;\r
+\r
+       size_t                                                                                  audio_sample_count_;\r
+       size_t                                                                                  video_frame_count_;\r
+               \r
+       size_t                                                                                  processed_audio_sample_count_;\r
+       size_t                                                                                  processed_video_frame_count_;\r
+\r
+       filter                                                                                  filter_;\r
+       safe_ptr<core::frame_factory>                                   frame_factory_;\r
+                               \r
+       implementation(frame_muxer2::token_t& active_token,\r
+                                  frame_muxer2::video_source_t& video_source,\r
+                                  frame_muxer2::audio_source_t& audio_source,\r
+                                  frame_muxer2::target_t& target,\r
+                                  double in_fps, \r
+                                  const safe_ptr<core::frame_factory>& frame_factory)\r
+               : active_token_(active_token)\r
+               , video_source_(video_source)\r
+               , audio_source_(audio_source)\r
+               , target_(target)\r
+               , video_streams_(1)\r
+               , audio_streams_(1)\r
+               , display_mode_(display_mode::invalid)\r
+               , in_fps_(in_fps)\r
+               , format_desc_(frame_factory->get_video_format_desc())\r
+               , auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
+               , audio_sample_count_(0)\r
+               , video_frame_count_(0)\r
+               , frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
+       {\r
+               start();\r
+       }\r
+\r
+       ~implementation()\r
+       {\r
+               agent::wait(this);\r
+       }\r
+\r
+       virtual void run()\r
+       {\r
+               try\r
+               {\r
+                       while(Concurrency::receive(active_token_))\r
+                       {\r
+                               Concurrency::parallel_invoke(\r
+                               [&]\r
+                               {\r
+                                       while(!video_ready())\r
+                                       {\r
+                                               auto video = Concurrency::receive(video_source_);\r
+                                               if(video == eof_video())\r
+                                                       break;\r
+                                               push(video, 0); \r
+                                       }\r
+                               },\r
+                               [&]\r
+                               {\r
+                                       while(!audio_ready())\r
+                                       {\r
+                                               auto audio = Concurrency::receive(audio_source_);\r
+                                               if(audio == eof_audio())\r
+                                                       break;\r
+                                               push(audio);    \r
+                                       }                                       \r
+                               });\r
+\r
+                               if(!video_ready() || !audio_ready())\r
+                               {\r
+                                       Concurrency::send(target_, std::shared_ptr<core::basic_frame>(core::basic_frame::eof()));\r
+                                       break;\r
+                               }\r
+\r
+                               commit();\r
+                       \r
+                               if(!frame_buffer_.empty())\r
+                               {\r
+                                       Concurrency::send(target_, std::shared_ptr<core::basic_frame>(frame_buffer_.front()));\r
+                                       frame_buffer_.pop_front();      \r
+                               }\r
+                       }\r
+               }\r
+               catch(...)\r
+               {\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+               }\r
+\r
+               std::shared_ptr<AVFrame> video;\r
+               Concurrency::try_receive(video_source_, video);\r
+               std::shared_ptr<core::audio_buffer> audio;\r
+               Concurrency::try_receive(audio_source_, audio);\r
+                                       \r
+               done();\r
+       }\r
+\r
+       void push(const std::shared_ptr<AVFrame>& video_frame, int hints)\r
+       {               \r
+               if(video_frame == loop_video())\r
+               {       \r
+                       CASPAR_LOG(debug) << L"video-frame-count: " << static_cast<float>(video_frame_count_);\r
+                       video_frame_count_ = 0;\r
+                       video_streams_.push_back(std::queue<safe_ptr<write_frame>>());\r
+                       return;\r
+               }\r
+\r
+               if(video_frame == empty_video())\r
+               {\r
+                       video_streams_.back().push(make_safe<core::write_frame>(this));\r
+                       ++video_frame_count_;\r
+                       display_mode_ = display_mode::simple;\r
+                       return;\r
+               }\r
+\r
+               if(display_mode_ == display_mode::invalid)\r
+               {\r
+                       if(auto_transcode_)\r
+                       {\r
+                               auto in_mode = get_mode(*video_frame);\r
+                               display_mode_ = get_display_mode(in_mode, in_fps_, format_desc_.field_mode, format_desc_.fps);\r
+                       \r
+                               if(display_mode_ == display_mode::simple && in_mode != core::field_mode::progressive && format_desc_.field_mode != core::field_mode::progressive && video_frame->height != static_cast<int>(format_desc_.height))\r
+                                       display_mode_ = display_mode::deinterlace_bob_reinterlace; // The frame will most likely be scaled, we need to deinterlace->reinterlace \r
+                               \r
+                               if(display_mode_ == display_mode::deinterlace)\r
+                                       filter_ = filter(L"YADIF=0:-1");\r
+                               else if(display_mode_ == display_mode::deinterlace_bob || display_mode_ == display_mode::deinterlace_bob_reinterlace)\r
+                                       filter_ = filter(L"YADIF=1:-1");\r
+                       }\r
+                       else\r
+                               display_mode_ = display_mode::simple;\r
+\r
+                       if(display_mode_ == display_mode::invalid)\r
+                       {\r
+                               CASPAR_LOG(warning) << L"[frame_muxer] Failed to detect display-mode.";\r
+                               display_mode_ = display_mode::simple;\r
+                       }\r
+\r
+                       CASPAR_LOG(info) << "[frame_muxer] " << display_mode::print(display_mode_);\r
+               }\r
+\r
+                               \r
+               if(hints & core::frame_producer::ALPHA_HINT)\r
+                       video_frame->format = make_alpha_format(video_frame->format);\r
+               \r
+               auto format = video_frame->format;\r
+               if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
+                       video_frame->format = PIX_FMT_GRAY8;\r
+\r
+               BOOST_FOREACH(auto& av_frame, filter_.execute(video_frame))\r
+               {\r
+                       av_frame->format = format;\r
+\r
+                       auto frame = make_write_frame(this, av_frame, frame_factory_, hints);\r
+\r
+                       // Fix field-order if needed\r
+                       if(frame->get_type() == core::field_mode::lower && format_desc_.field_mode == core::field_mode::upper)\r
+                               frame->get_frame_transform().fill_translation[1] += 1.0/static_cast<double>(format_desc_.height);\r
+                       else if(frame->get_type() == core::field_mode::upper && format_desc_.field_mode == core::field_mode::lower)\r
+                               frame->get_frame_transform().fill_translation[1] -= 1.0/static_cast<double>(format_desc_.height);\r
+\r
+                       video_streams_.back().push(frame);\r
+                       ++video_frame_count_;\r
+               }\r
+\r
+               if(video_streams_.back().size() > 8)\r
+                       BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("video-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+       }\r
+\r
+       void push(std::shared_ptr<core::audio_buffer> audio_samples)\r
+       {\r
+               if(audio_samples == loop_audio())       \r
+               {\r
+                       CASPAR_LOG(debug) << L"audio-chunk-count: " << audio_sample_count_/format_desc_.audio_samples_per_frame;\r
+                       audio_streams_.push_back(core::audio_buffer());\r
+                       audio_sample_count_ = 0;\r
+                       return;\r
+               }\r
+\r
+               if(audio_samples == empty_audio())              \r
+                       audio_samples = std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame);             \r
+\r
+               audio_sample_count_ += audio_samples->size();\r
+\r
+               boost::range::push_back(audio_streams_.back(), *audio_samples);\r
+\r
+               if(audio_streams_.back().size() > 8*format_desc_.audio_samples_per_frame)\r
+                       BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("audio-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+       }\r
+       \r
+       size_t size() const\r
+       {\r
+               return frame_buffer_.size();\r
+       }\r
+\r
+       safe_ptr<core::write_frame> pop_video()\r
+       {\r
+               auto frame = video_streams_.front().front();\r
+               video_streams_.front().pop();\r
+               \r
+               return frame;\r
+       }\r
+\r
+       core::audio_buffer pop_audio()\r
+       {\r
+               CASPAR_VERIFY(audio_streams_.front().size() >= format_desc_.audio_samples_per_frame);\r
+\r
+               auto begin = audio_streams_.front().begin();\r
+               auto end   = begin + format_desc_.audio_samples_per_frame;\r
+\r
+               auto samples = core::audio_buffer(begin, end);\r
+               audio_streams_.front().erase(begin, end);\r
+\r
+               return samples;\r
+       }\r
+       \r
+       bool video_ready() const\r
+       {               \r
+               return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && video_ready2());\r
+       }\r
+       \r
+       bool audio_ready() const\r
+       {\r
+               return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && audio_ready2());\r
+       }\r
+\r
+       bool video_ready2() const\r
+       {               \r
+               switch(display_mode_)\r
+               {\r
+               case display_mode::deinterlace_bob_reinterlace:                                 \r
+               case display_mode::interlace:                                   \r
+                       return video_streams_.front().size() >= 2;\r
+               default:                                                                                \r
+                       return !video_streams_.front().empty();\r
+               }\r
+       }\r
+       \r
+       bool audio_ready2() const\r
+       {\r
+               switch(display_mode_)\r
+               {\r
+               case display_mode::duplicate:                                   \r
+                       return audio_streams_.front().size()/2 >= format_desc_.audio_samples_per_frame;\r
+               default:                                                                                \r
+                       return audio_streams_.front().size() >= format_desc_.audio_samples_per_frame;\r
+               }\r
+       }\r
+               \r
+       void commit()\r
+       {\r
+               if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!video_ready2() || !audio_ready2()))\r
+               {\r
+                       if(!video_streams_.front().empty() || !audio_streams_.front().empty())\r
+                               CASPAR_LOG(debug) << "Truncating: " << video_streams_.front().size() << L" video-frames, " << audio_streams_.front().size() << L" audio-samples.";\r
+\r
+                       video_streams_.pop_front();\r
+                       audio_streams_.pop_front();\r
+               }\r
+\r
+               if(!video_ready2() || !audio_ready2())\r
+                       return;\r
+               \r
+               switch(display_mode_)\r
+               {\r
+               case display_mode::simple:                                              return simple(frame_buffer_);\r
+               case display_mode::duplicate:                                   return duplicate(frame_buffer_);\r
+               case display_mode::half:                                                return half(frame_buffer_);\r
+               case display_mode::interlace:                                   return interlace(frame_buffer_);\r
+               case display_mode::deinterlace_bob:                             return simple(frame_buffer_);\r
+               case display_mode::deinterlace_bob_reinterlace: return interlace(frame_buffer_);\r
+               case display_mode::deinterlace:                                 return simple(frame_buffer_);\r
+               default:                                                                                BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
+               }\r
+       }\r
+       \r
+       void simple(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {               \r
+               auto frame1 = pop_video();\r
+               frame1->audio_data() = pop_audio();\r
+\r
+               dest.push_back(frame1);         \r
+       }\r
+\r
+       void duplicate(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {               \r
+               auto frame = pop_video();\r
+\r
+               auto frame1 = make_safe<core::write_frame>(*frame); // make a copy\r
+               frame1->audio_data() = pop_audio();\r
+\r
+               auto frame2 = frame;\r
+               frame2->audio_data() = pop_audio();\r
+\r
+               dest.push_back(frame1);\r
+               dest.push_back(frame2);\r
+       }\r
+\r
+       void half(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {                                                       \r
+               auto frame1 = pop_video();\r
+               frame1->audio_data() = pop_audio();\r
+                               \r
+               video_streams_.front().pop(); // Throw away\r
+\r
+               dest.push_back(frame1);\r
+       }\r
+       \r
+       void interlace(std::deque<safe_ptr<basic_frame>>& dest)\r
+       {                               \r
+               auto frame1 = pop_video();\r
+               frame1->audio_data() = pop_audio();\r
+                               \r
+               auto frame2 = pop_video();\r
+\r
+               dest.push_back(core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode));          \r
+       }\r
+       \r
+       int64_t calc_nb_frames(int64_t nb_frames) const\r
+       {\r
+               switch(display_mode_)\r
+               {\r
+               case display_mode::interlace:\r
+               case display_mode::half:\r
+                       return nb_frames/2;\r
+               case display_mode::duplicate:\r
+               case display_mode::deinterlace_bob:\r
+                       return nb_frames*2;\r
+               default:\r
+                       return nb_frames;\r
+               }\r
+       }\r
+};\r
+\r
+frame_muxer2::frame_muxer2(token_t& active_token,\r
+                                                  video_source_t& video_source, \r
+                                                  audio_source_t& audio_source,\r
+                                                  target_t& target,\r
+                                                  double in_fps, \r
+                                                  const safe_ptr<core::frame_factory>& frame_factory)\r
+       : impl_(new implementation(active_token, video_source, audio_source, target, in_fps, frame_factory))\r
+{\r
+}\r
+int64_t frame_muxer2::calc_nb_frames(int64_t nb_frames) const\r
+{\r
+       return impl_->calc_nb_frames(nb_frames);\r
+}\r
+\r
 }}
\ No newline at end of file
index 8aa79834c0ce2f081d1ec9f8289bd440591e8ceb..8c43ff5b2ab1e6b704f07b5736542dc3d25411e7 100644 (file)
@@ -6,6 +6,7 @@
 \r
 #include <boost/noncopyable.hpp>\r
 \r
+#include <agents.h>\r
 #include <vector>\r
 \r
 struct AVFrame;\r
@@ -46,4 +47,26 @@ private:
        safe_ptr<implementation> impl_;\r
 };\r
 \r
+class frame_muxer2 : boost::noncopyable\r
+{\r
+public:\r
+       \r
+       typedef Concurrency::ISource<bool>                                                                      token_t;\r
+       typedef Concurrency::ISource<std::shared_ptr<AVFrame>>                          video_source_t;\r
+       typedef Concurrency::ISource<std::shared_ptr<core::audio_buffer>>       audio_source_t;\r
+       typedef Concurrency::ITarget<std::shared_ptr<core::basic_frame>>        target_t;\r
+\r
+       frame_muxer2(token_t& active_token,\r
+                                video_source_t& video_source,\r
+                                audio_source_t& audio_source, \r
+                                target_t& target,\r
+                                double in_fps, \r
+                                const safe_ptr<core::frame_factory>& frame_factory);\r
+       \r
+       int64_t calc_nb_frames(int64_t nb_frames) const;\r
+private:\r
+       struct implementation;\r
+       safe_ptr<implementation> impl_;\r
+};\r
+\r
 }}
\ No newline at end of file
index c8b32cd4a0fc9f8b3d03b6d9ef2827a3ae5706c3..2627be0a833925a160a6566e77b15fba52c9ab5f 100644 (file)
 #include "../stdafx.h"\r
 \r
 #include "input.h"\r
+#include "util.h"\r
 #include "../ffmpeg_error.h"\r
 #include "../tbb_avcodec.h"\r
 \r
 #include <core/video_format.h>\r
 \r
+#include <common/concrt/scoped_oversubscription_token.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <common/exception/exceptions.h>\r
 #include <common/exception/win32_exception.h>\r
 \r
-#include <tbb/concurrent_queue.h>\r
 #include <tbb/atomic.h>\r
 \r
 #include <boost/range/algorithm.hpp>\r
-#include <boost/thread/condition_variable.hpp>\r
-#include <boost/thread/mutex.hpp>\r
-#include <boost/thread/thread.hpp>\r
+\r
+#include <agents.h>\r
 \r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
@@ -55,13 +55,15 @@ extern "C"
 #pragma warning (pop)\r
 #endif\r
 \r
+using namespace Concurrency;\r
+\r
 namespace caspar { namespace ffmpeg {\r
 \r
 static const size_t MAX_BUFFER_COUNT = 100;\r
 static const size_t MIN_BUFFER_COUNT = 4;\r
 static const size_t MAX_BUFFER_SIZE  = 16 * 1000000;\r
        \r
-struct input::implementation : boost::noncopyable\r
+struct input::implementation : public Concurrency::agent, boost::noncopyable\r
 {              \r
        std::shared_ptr<AVFormatContext>                                                        format_context_; // Destroy this last\r
        int                                                                                                                     default_stream_index_;\r
@@ -74,27 +76,35 @@ struct input::implementation : boost::noncopyable
        const size_t                                                                                            length_;\r
        size_t                                                                                                          frame_number_;\r
        \r
-       tbb::concurrent_bounded_queue<std::shared_ptr<AVPacket>>        buffer_;\r
-       tbb::atomic<size_t>                                                                                     buffer_size_;\r
-       boost::condition_variable                                                                       buffer_cond_;\r
-       boost::mutex                                                                                            buffer_mutex_;\r
+       input::token_t&                                                                                         active_token_;\r
+       input::target_t&                                                                                        video_target_;\r
+       input::target_t&                                                                                        audio_target_;\r
                \r
-       boost::thread                                                                                           thread_;\r
-       tbb::atomic<bool>                                                                                       is_running_;\r
-\r
        tbb::atomic<size_t>                                                                                     nb_frames_;\r
        tbb::atomic<size_t>                                                                                     nb_loops_;\r
 \r
+       int                                                                                                                     video_index_;\r
+       int                                                                                                                     audio_index_;\r
+\r
 public:\r
-       explicit implementation(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
-               : graph_(graph)\r
+       explicit implementation(input::token_t& active_token,\r
+                                                       input::target_t& video_target,\r
+                                                       input::target_t& audio_target,\r
+                                                       const safe_ptr<diagnostics::graph>& graph, \r
+                                                       const std::wstring& filename, \r
+                                                       bool loop, \r
+                                                       size_t start,\r
+                                                       size_t length)\r
+               : active_token_(active_token)\r
+               , video_target_(video_target)\r
+               , audio_target_(audio_target)\r
+               , graph_(graph)\r
                , loop_(loop)\r
                , filename_(filename)\r
                , start_(start)\r
                , length_(length)\r
                , frame_number_(0)\r
        {                       \r
-               is_running_ = true;\r
                nb_frames_      = 0;\r
                nb_loops_       = 0;\r
                \r
@@ -108,96 +118,58 @@ public:
                THROW_ON_ERROR2(avformat_find_stream_info(format_context_.get(), nullptr), print());\r
                \r
                default_stream_index_ = THROW_ON_ERROR2(av_find_default_stream_index(format_context_.get()), print());\r
-\r
+               video_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0);\r
+               audio_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_AUDIO, -1, -1, 0, 0);\r
+               \r
                if(start_ > 0)                  \r
                        seek_frame(start_);\r
                \r
-               for(int n = 0; n < 16 && !full(); ++n)\r
+               for(int n = 0; n < 16; ++n)\r
                        read_next_packet();\r
                                                \r
                graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));        \r
-               graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f));\r
-               graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
 \r
-               thread_ = boost::thread([this]{run();});\r
+               agent::start();\r
        }\r
 \r
        ~implementation()\r
        {\r
-               is_running_ = false;\r
-               buffer_cond_.notify_all();\r
-               thread_.join();\r
-       }\r
-               \r
-       bool try_pop(std::shared_ptr<AVPacket>& packet)\r
-       {\r
-               const bool result = buffer_.try_pop(packet);\r
-\r
-               if(result)\r
-               {\r
-                       if(packet)\r
-                               buffer_size_ -= packet->size;\r
-                       buffer_cond_.notify_all();\r
-               }\r
-\r
-               graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
-               graph_->update_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
-\r
-               return result;\r
-       }\r
-\r
-       size_t nb_frames() const\r
-       {\r
-               return nb_frames_;\r
-       }\r
-\r
-       size_t nb_loops() const\r
-       {\r
-               return nb_loops_;\r
+               agent::wait(this);\r
        }\r
-\r
-private:\r
        \r
-       void run()\r
-       {               \r
-               caspar::win32_exception::install_handler();\r
-\r
+       virtual void run()\r
+       {\r
                try\r
                {\r
-                       CASPAR_LOG(info) << print() << " Thread Started.";\r
-\r
-                       while(is_running_)\r
+                       while(Concurrency::receive(active_token_))\r
                        {\r
+                               if(!read_next_packet())\r
                                {\r
-                                       boost::unique_lock<boost::mutex> lock(buffer_mutex_);\r
-                                       while(full())\r
-                                               buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20));\r
+                                       Concurrency::send(video_target_, eof_packet());\r
+                                       Concurrency::send(audio_target_, eof_packet());\r
+                                       break;\r
                                }\r
-                               read_next_packet();                     \r
-                       }\r
-\r
-                       CASPAR_LOG(info) << print() << " Thread Stopped.";\r
+                       }                               \r
                }\r
                catch(...)\r
                {\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       is_running_ = false;\r
-               }\r
+               }               \r
+                                               \r
+               done();\r
        }\r
-                       \r
-       void read_next_packet()\r
+\r
+       bool read_next_packet()\r
        {               \r
                int ret = 0;\r
 \r
-               std::shared_ptr<AVPacket> read_packet(new AVPacket, [](AVPacket* p)\r
+               auto read_packet = create_packet();\r
+\r
                {\r
-                       av_free_packet(p);\r
-                       delete p;\r
-               });\r
-               av_init_packet(read_packet.get());\r
+                       Concurrency::scoped_oversubcription_token oversubscribe;\r
+                       ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life.     \r
+               }\r
 \r
-               ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life.     \r
-               \r
                if(is_eof(ret))                                                                                                              \r
                {\r
                        ++nb_loops_;\r
@@ -221,11 +193,11 @@ private:
                        }       \r
                        else\r
                        {\r
-                               is_running_ = false;\r
                                CASPAR_LOG(trace) << print() << " Stopping.";\r
+                               return false;\r
                        }\r
                }\r
-               else\r
+               else if(read_packet->stream_index == video_index_ || read_packet->stream_index == audio_index_)\r
                {               \r
                        THROW_ON_ERROR(ret, print(), "av_read_frame");\r
 \r
@@ -247,24 +219,23 @@ private:
                                read_packet->size = size;\r
                                read_packet->data = data;\r
                        });\r
+       \r
+                       if(read_packet->stream_index == video_index_)\r
+                               Concurrency::send(video_target_, read_packet);\r
+                       else if(read_packet->stream_index == audio_index_)\r
+                               Concurrency::send(audio_target_, read_packet);\r
+               }       \r
 \r
-                       buffer_.try_push(read_packet);\r
-                       buffer_size_ += read_packet->size;\r
-                               \r
-                       graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
-                       graph_->update_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
-               }                       \r
-       }\r
-\r
-       bool full() const\r
-       {\r
-               return is_running_ && (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT;\r
+               return true;\r
        }\r
 \r
        void seek_frame(int64_t frame, int flags = 0)\r
        {                                                       \r
-               THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print());            \r
-               buffer_.push(nullptr);\r
+               THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print());    \r
+               auto packet = create_packet();\r
+               packet->size = 0;\r
+               Concurrency::send(video_target_, loop_packet());                \r
+               Concurrency::send(audio_target_, loop_packet());\r
        }               \r
 \r
        bool is_eof(int ret)\r
@@ -283,11 +254,31 @@ private:
        }\r
 };\r
 \r
-input::input(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
-       : impl_(new implementation(graph, filename, loop, start, length)){}\r
-bool input::eof() const {return !impl_->is_running_;}\r
-bool input::try_pop(std::shared_ptr<AVPacket>& packet){return impl_->try_pop(packet);}\r
-safe_ptr<AVFormatContext> input::context(){return make_safe(impl_->format_context_);}\r
-size_t input::nb_frames() const {return impl_->nb_frames();}\r
-size_t input::nb_loops() const {return impl_->nb_loops();}\r
+input::input(token_t& active_token,  \r
+                        target_t& video_target,  \r
+                        target_t& audio_target,  \r
+                    const safe_ptr<diagnostics::graph>& graph, \r
+                        const std::wstring& filename, \r
+                        bool loop, \r
+                        size_t start, \r
+                        size_t length)\r
+       : impl_(new implementation(active_token, video_target, audio_target, graph, filename, loop, start, length))\r
+{\r
+}\r
+\r
+safe_ptr<AVFormatContext> input::context()\r
+{\r
+       return safe_ptr<AVFormatContext>(impl_->format_context_);\r
+}\r
+\r
+size_t input::nb_frames() const\r
+{\r
+       return impl_->nb_frames_;\r
+}\r
+\r
+size_t input::nb_loops() const \r
+{\r
+       return impl_->nb_loops_;\r
+}\r
+\r
 }}
\ No newline at end of file
index 6b391581f519b7ab615aa52478316632676777d4..ea7741da94e6454b9e578ee935dbe601163a9f59 100644 (file)
 \r
 #include <common/memory/safe_ptr.h>\r
 \r
+#include <agents.h>\r
 #include <memory>\r
 #include <string>\r
 \r
 #include <boost/noncopyable.hpp>\r
+#include <boost/range/iterator_range.hpp>\r
 \r
 struct AVFormatContext;\r
 struct AVPacket;\r
@@ -38,20 +40,28 @@ class graph;
 }\r
         \r
 namespace ffmpeg {\r
-\r
+                       \r
 class input : boost::noncopyable\r
 {\r
 public:\r
-       explicit input(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start = 0, size_t length = std::numeric_limits<size_t>::max());\r
-\r
-       bool try_pop(std::shared_ptr<AVPacket>& packet);\r
-       bool eof() const;\r
+       \r
+       typedef Concurrency::ISource<bool>                                              token_t;\r
+       typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
 \r
+       explicit input(token_t& active_token,\r
+                                  target_t& video_target,  \r
+                                  target_t& audio_target,  \r
+                                  const safe_ptr<diagnostics::graph>& graph, \r
+                                  const std::wstring& filename, bool loop, \r
+                                  size_t start = 0, \r
+                                  size_t length = std::numeric_limits<size_t>::max());\r
+               \r
        size_t nb_frames() const;\r
        size_t nb_loops() const;\r
-\r
+       \r
        safe_ptr<AVFormatContext> context();\r
 private:\r
+       friend struct implemenation;\r
        struct implementation;\r
        std::shared_ptr<implementation> impl_;\r
 };\r
index 79fae01a8cbb9bcae4b8ed949ff3fe1a8dae64dc..a98c08737f16450882371031dda2d57e2a4c4ee7 100644 (file)
@@ -5,7 +5,7 @@
 #include "format/flv.h"\r
 \r
 #include <tbb/concurrent_unordered_map.h>\r
-#include <tbb/concurrent_queue.h>\r
+#include <concurrent_queue.h>\r
 \r
 #include <core/producer/frame/frame_transform.h>\r
 #include <core/producer/frame/frame_factory.h>\r
@@ -14,7 +14,7 @@
 \r
 #include <common/exception/exceptions.h>\r
 \r
-#include <tbb/parallel_for.h>\r
+#include <ppl.h>\r
 \r
 #include <boost/filesystem.hpp>\r
 #include <boost/lexical_cast.hpp>\r
@@ -127,7 +127,7 @@ int make_alpha_format(int format)
 \r
 safe_ptr<core::write_frame> make_write_frame(const void* tag, const safe_ptr<AVFrame>& decoded_frame, const safe_ptr<core::frame_factory>& frame_factory, int hints)\r
 {                      \r
-       static tbb::concurrent_unordered_map<size_t, tbb::concurrent_queue<std::shared_ptr<SwsContext>>> sws_contexts_;\r
+       static tbb::concurrent_unordered_map<size_t, Concurrency::concurrent_queue<std::shared_ptr<SwsContext>>> sws_contexts_;\r
        \r
        const auto width  = decoded_frame->width;\r
        const auto height = decoded_frame->height;\r
@@ -188,12 +188,10 @@ safe_ptr<core::write_frame> make_write_frame(const void* tag, const safe_ptr<AVF
                        auto decoded_linesize = decoded_frame->linesize[n];\r
                                \r
                        // Copy line by line since ffmpeg sometimes pads each line.\r
-                       tbb::affinity_partitioner ap;\r
-                       tbb::parallel_for(tbb::blocked_range<size_t>(0, static_cast<int>(desc.planes[n].height)), [&](const tbb::blocked_range<size_t>& r)\r
+                       Concurrency::parallel_for(0, static_cast<int>(desc.planes[n].height), [&](size_t y)\r
                        {\r
-                               for(size_t y = r.begin(); y != r.end(); ++y)\r
-                                       memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize);\r
-                       }, ap);\r
+                               memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize);\r
+                       });\r
 \r
                        write->commit(n);\r
                }\r
@@ -287,4 +285,64 @@ void fix_meta_data(AVFormatContext& context)
        video_context.time_base.den = static_cast<int>(closest_fps*1000000.0);\r
 }\r
 \r
+std::shared_ptr<AVPacket> create_packet()\r
+{\r
+       std::shared_ptr<AVPacket> packet(new AVPacket, [](AVPacket* p)\r
+       {\r
+               av_free_packet(p);\r
+               delete p;\r
+       });\r
+       \r
+       av_init_packet(packet.get());\r
+       return packet;\r
+}\r
+\r
+const std::shared_ptr<AVPacket>& loop_packet()\r
+{\r
+       static auto packet1 = create_packet();\r
+       return packet1;\r
+}\r
+\r
+const std::shared_ptr<AVPacket>& eof_packet()\r
+{\r
+       static auto packet2 = create_packet();\r
+       return packet2;\r
+}\r
+\r
+const std::shared_ptr<AVFrame>& loop_video()\r
+{\r
+       static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+       return frame1;\r
+}\r
+\r
+const std::shared_ptr<AVFrame>& empty_video()\r
+{\r
+       static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+       return frame1;\r
+}\r
+\r
+const std::shared_ptr<AVFrame>& eof_video()\r
+{\r
+       static auto frame2 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+       return frame2;\r
+}\r
+\r
+const std::shared_ptr<core::audio_buffer>& loop_audio()\r
+{\r
+       static auto audio1 = std::make_shared<core::audio_buffer>();\r
+       return audio1;\r
+}\r
+\r
+const std::shared_ptr<core::audio_buffer>& empty_audio()\r
+{\r
+       static auto audio1 = std::make_shared<core::audio_buffer>();\r
+       return audio1;\r
+}\r
+\r
+const std::shared_ptr<core::audio_buffer>& eof_audio()\r
+{\r
+       static auto audio2 = std::make_shared<core::audio_buffer>();\r
+       return audio2;\r
+}\r
+\r
 }}
\ No newline at end of file
index 7b32ff9f0b1c090c8b1a0f7b7cc41e66239c4dcc..1811413db8b46baddf79c6b4a8249ba4ecbbca10 100644 (file)
@@ -4,14 +4,25 @@
 \r
 #include <core/video_format.h>\r
 #include <core/producer/frame/pixel_format.h>\r
+#include <core/mixer/audio/audio_mixer.h>\r
 \r
-extern "C"\r
+\r
+#if defined(_MSC_VER)\r
+#pragma warning (push)\r
+#pragma warning (disable : 4244)\r
+#endif\r
+extern "C" \r
 {\r
        #include <libavutil/pixfmt.h>\r
+       #include <libavcodec/avcodec.h>\r
 }\r
+#if defined(_MSC_VER)\r
+#pragma warning (pop)\r
+#endif\r
 \r
 struct AVFrame;\r
 struct AVFormatContext;\r
+struct AVPacket;\r
 \r
 namespace caspar {\r
 \r
@@ -35,4 +46,14 @@ safe_ptr<core::write_frame> make_write_frame(const void* tag, const safe_ptr<AVF
 \r
 void fix_meta_data(AVFormatContext& context);\r
 \r
+std::shared_ptr<AVPacket> create_packet();\r
+const std::shared_ptr<AVPacket>& loop_packet();\r
+const std::shared_ptr<AVPacket>& eof_packet();\r
+const std::shared_ptr<AVFrame>& loop_video();\r
+const std::shared_ptr<AVFrame>& empty_video();\r
+const std::shared_ptr<AVFrame>& eof_video();\r
+const std::shared_ptr<core::audio_buffer>& loop_audio();\r
+const std::shared_ptr<core::audio_buffer>& empty_audio();\r
+const std::shared_ptr<core::audio_buffer>& eof_audio();\r
+\r
 }}
\ No newline at end of file
index 4e2ba9664fb22d640905e8b52af3b6ac04de1e31..cc0eaaef0d5303b0cd06cc3efd427af4488c442c 100644 (file)
 #include "../../ffmpeg_error.h"\r
 #include "../../tbb_avcodec.h"\r
 \r
-#include <core/producer/frame/frame_transform.h>\r
-#include <core/producer/frame/frame_factory.h>\r
-\r
-#include <boost/range/algorithm_ext/push_back.hpp>\r
-#include <boost/filesystem.hpp>\r
-\r
-#include <queue>\r
+#include <core/producer/frame/basic_frame.h>\r
 \r
 #if defined(_MSC_VER)\r
 #pragma warning (push)\r
@@ -50,14 +44,15 @@ extern "C"
 \r
 namespace caspar { namespace ffmpeg {\r
        \r
-struct video_decoder::implementation : boost::noncopyable\r
+struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
 {\r
-       const safe_ptr<core::frame_factory>             frame_factory_;\r
+       video_decoder::token_t&                                 active_token_;\r
+       video_decoder::source_t&                                source_;\r
+       video_decoder::target_t&                                target_;\r
+\r
        std::shared_ptr<AVCodecContext>                 codec_context_;\r
        int                                                                             index_;\r
 \r
-       std::queue<std::shared_ptr<AVPacket>>   packets_;\r
-\r
        filter                                                                  filter_;\r
 \r
        double                                                                  fps_;\r
@@ -65,15 +60,24 @@ struct video_decoder::implementation : boost::noncopyable
 \r
        size_t                                                                  width_;\r
        size_t                                                                  height_;\r
+       bool                                                                    is_progressive_;\r
 \r
 public:\r
-       explicit implementation(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter) \r
-               : frame_factory_(frame_factory)\r
-               , filter_(filter)\r
-               , fps_(frame_factory_->get_video_format_desc().fps)\r
+       explicit implementation(video_decoder::token_t& active_token,\r
+                                                       video_decoder::source_t& source,\r
+                                                       video_decoder::target_t& target,\r
+                                                       const safe_ptr<AVFormatContext>& context,\r
+                                                       double fps,\r
+                                                       const std::wstring& filter) \r
+               : active_token_(active_token)\r
+               , source_(source)\r
+               , target_(target)\r
+               , filter_(filter.empty() ? L"copy" : filter)\r
+               , fps_(fps)\r
                , nb_frames_(0)\r
                , width_(0)\r
                , height_(0)\r
+               , is_progressive_(true)\r
        {\r
                try\r
                {\r
@@ -106,72 +110,77 @@ public:
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                        CASPAR_LOG(warning) << "[video_decoder] Failed to open video-stream. Running without video.";   \r
                }\r
-       }\r
 \r
-       void push(const std::shared_ptr<AVPacket>& packet)\r
+               start();\r
+       }\r
+       \r
+       ~implementation()\r
        {\r
-               if(packet && packet->stream_index != index_)\r
-                       return;\r
-\r
-               packets_.push(packet);\r
+               agent::wait(this);\r
        }\r
-\r
-       std::vector<std::shared_ptr<AVFrame>> poll()\r
-       {               \r
-               std::vector<std::shared_ptr<AVFrame>> result;\r
-\r
-               if(packets_.empty())\r
-                       return result;\r
-\r
-               if(!codec_context_)\r
-                       return empty_poll();\r
-\r
-               auto packet = packets_.front();\r
-                                       \r
-               if(packet)\r
-               {                       \r
-                       BOOST_FOREACH(auto& frame, decode(*packet))\r
-                               boost::range::push_back(result, filter_.execute(frame));\r
-\r
-                       if(packet->size == 0)\r
-                               packets_.pop();\r
-               }\r
-               else\r
+       \r
+       virtual void run()\r
+       {\r
+               try\r
                {\r
-                       if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+                       while(Concurrency::receive(active_token_))\r
                        {\r
-                               AVPacket pkt;\r
-                               av_init_packet(&pkt);\r
-                               pkt.data = nullptr;\r
-                               pkt.size = 0;\r
-\r
-                               BOOST_FOREACH(auto& frame, decode(pkt))\r
-                                       boost::range::push_back(result, filter_.execute(frame));        \r
-                       }\r
-\r
-                       if(result.empty())\r
-                       {                                       \r
-                               packets_.pop();\r
-                               avcodec_flush_buffers(codec_context_.get());\r
-                               result.push_back(nullptr);\r
+                               auto packet = Concurrency::receive(source_);\r
+                               if(packet == eof_packet())\r
+                               {\r
+                                       Concurrency::send(target_, eof_video());\r
+                                       break;\r
+                               }\r
+\r
+                               if(packet == loop_packet())\r
+                               {       \r
+                                       if(codec_context_)\r
+                                       {\r
+                                               if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+                                               {\r
+                                                       AVPacket pkt;\r
+                                                       av_init_packet(&pkt);\r
+                                                       pkt.data = nullptr;\r
+                                                       pkt.size = 0;\r
+                               \r
+                                                       BOOST_FOREACH(auto& frame1, decode(pkt))\r
+                                                       {\r
+                                                               BOOST_FOREACH(auto& frame2, filter_.execute(frame1))\r
+                                                                       Concurrency::send(target_, std::shared_ptr<AVFrame>(frame2));\r
+                                                       }       \r
+                                               }\r
+\r
+                                               avcodec_flush_buffers(codec_context_.get());\r
+                                       }\r
+\r
+                                       Concurrency::send(target_, loop_video());       \r
+                               }\r
+                               else if(!codec_context_)\r
+                               {\r
+                                       Concurrency::send(target_, empty_video());      \r
+                               }\r
+                               else\r
+                               {\r
+                                       while(packet->size > 0)\r
+                                       {\r
+                                               BOOST_FOREACH(auto& frame1, decode(*packet))\r
+                                               {\r
+                                                       BOOST_FOREACH(auto& frame2, filter_.execute(frame1))\r
+                                                               Concurrency::send(target_, std::shared_ptr<AVFrame>(frame2));\r
+                                               }\r
+                                       }\r
+                               }\r
                        }\r
                }\r
-               \r
-               return result;\r
-       }\r
-\r
-       std::vector<std::shared_ptr<AVFrame>> empty_poll()\r
-       {                               \r
-               auto packet = packets_.front();\r
-               packets_.pop();\r
-\r
-               if(!packet)                     \r
-                       return boost::assign::list_of(nullptr);\r
-\r
-               std::shared_ptr<AVFrame> frame(avcodec_alloc_frame(), av_free);\r
-               frame->data[0] = nullptr;\r
+               catch(...)\r
+               {\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+               }\r
 \r
-               return boost::assign::list_of(frame);                                   \r
+               std::shared_ptr<AVPacket> packet;\r
+               Concurrency::try_receive(source_, packet);              \r
+               \r
+               done();\r
        }\r
 \r
        std::vector<std::shared_ptr<AVFrame>> decode(AVPacket& pkt)\r
@@ -193,27 +202,31 @@ public:
                if(decoded_frame->repeat_pict % 2 > 0)\r
                        CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
                \r
+               is_progressive_ = decoded_frame->interlaced_frame == 0;\r
+\r
                return std::vector<std::shared_ptr<AVFrame>>(1 + decoded_frame->repeat_pict/2, decoded_frame);\r
        }\r
-       \r
-       bool ready() const\r
-       {\r
-               return !packets_.empty();\r
-       }\r
-       \r
+               \r
        double fps() const\r
        {\r
                return fps_;\r
        }\r
 };\r
 \r
-video_decoder::video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter) : impl_(new implementation(context, frame_factory, filter)){}\r
-void video_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
-std::vector<std::shared_ptr<AVFrame>> video_decoder::poll(){return impl_->poll();}\r
-bool video_decoder::ready() const{return impl_->ready();}\r
+video_decoder::video_decoder(token_t& active_token,\r
+                                                        source_t& source,\r
+                                                        target_t& target,\r
+                                                        const safe_ptr<AVFormatContext>& context, \r
+                                                        double fps, \r
+                                                        const std::wstring& filter) \r
+       : impl_(new implementation(active_token, source, target, context, fps, filter))\r
+{\r
+}\r
+\r
 double video_decoder::fps() const{return impl_->fps();}\r
 int64_t video_decoder::nb_frames() const{return impl_->nb_frames_;}\r
 size_t video_decoder::width() const{return impl_->width_;}\r
 size_t video_decoder::height() const{return impl_->height_;}\r
+bool video_decoder::is_progressive() const{return impl_->is_progressive_;}\r
 \r
 }}
\ No newline at end of file
index 1351e881bf0f5ee35fd8b4e2ac078b30d077f4ad..2fb0b85636aed90428fc43a86483b6d7a90a475e 100644 (file)
@@ -25,6 +25,7 @@
 \r
 #include <boost/noncopyable.hpp>\r
 \r
+#include <agents.h>\r
 #include <vector>\r
 \r
 struct AVFormatContext;\r
@@ -35,7 +36,6 @@ namespace caspar {
 \r
 namespace core {\r
        struct frame_factory;\r
-       class write_frame;\r
 }\r
 \r
 namespace ffmpeg {\r
@@ -43,16 +43,23 @@ namespace ffmpeg {
 class video_decoder : boost::noncopyable\r
 {\r
 public:\r
-       explicit video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter);\r
-       \r
-       void push(const std::shared_ptr<AVPacket>& packet);\r
-       bool ready() const;\r
-       std::vector<std::shared_ptr<AVFrame>> poll();\r
-       \r
+\r
+       typedef Concurrency::ISource<bool>                                              token_t;\r
+       typedef Concurrency::ISource<std::shared_ptr<AVPacket>> source_t;\r
+       typedef Concurrency::ITarget<std::shared_ptr<AVFrame>>  target_t;\r
+\r
+       explicit video_decoder(token_t& active_token,\r
+                                                  source_t& source,\r
+                                                  target_t& target,\r
+                                                  const safe_ptr<AVFormatContext>& context, \r
+                                                  double fps, \r
+                                                  const std::wstring& filter); \r
+\r
        size_t width() const;\r
        size_t height() const;\r
 \r
        int64_t nb_frames() const;\r
+       bool is_progressive() const;\r
 \r
        double fps() const;\r
 private:\r
index 71d623d607ba57ab93beed1427289070178f0dfc..1f0eb7b2a9af95db54523b6b7f695a49f5e8953d 100644 (file)
@@ -8,7 +8,8 @@
 #include <common/env.h>\r
 #include <common/utility/assert.h>\r
 \r
-#include <tbb/task.h>\r
+#include <ppl.h>\r
+\r
 #include <tbb/atomic.h>\r
 #include <tbb/parallel_for.h>\r
 #include <tbb/tbb_thread.h>\r
@@ -31,14 +32,11 @@ namespace caspar { namespace ffmpeg {
                \r
 int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)\r
 {\r
-       tbb::parallel_for(tbb::blocked_range<size_t>(0, count), [&](const tbb::blocked_range<size_t>& r)\r
+       Concurrency::parallel_for(0, count, [&](size_t n)\r
        {\r
-               for(size_t n = r.begin(); n != r.end(); ++n)            \r
-               {\r
-                       int r = func(s, reinterpret_cast<uint8_t*>(arg) + n*size);\r
-                       if(ret)\r
-                               ret[n] = r;\r
-               }\r
+               int r = func(s, reinterpret_cast<uint8_t*>(arg) + n*size);\r
+               if(ret)\r
+                       ret[n] = r;\r
        });\r
 \r
        return 0;\r
@@ -51,15 +49,12 @@ int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg
 \r
        CASPAR_ASSERT(tbb::tbb_thread::hardware_concurrency() < 16);\r
        // Note: this will probably only work when tbb::task_scheduler_init::num_threads() < 16.\r
-    tbb::parallel_for(tbb::blocked_range<int>(0, count, 2), [&](const tbb::blocked_range<int> &r)    \r
+    Concurrency::parallel_for(0, count, 2, [&](int jobnr)    \r
     {   \r
         int threadnr = counter++;   \r
-        for(int jobnr = r.begin(); jobnr != r.end(); ++jobnr)\r
-        {   \r
-            int r = func(s, arg, jobnr, threadnr);   \r
-            if (ret)   \r
-                ret[jobnr] = r;   \r
-        }\r
+        int r = func(s, arg, jobnr, threadnr);   \r
+        if (ret)   \r
+            ret[jobnr] = r;  \r
         --counter;\r
     });   \r
 \r
index 236741a0c35e54be25021ddd7fada8605def6758..92838922f054f954372645bb4e1414e8828ef1e2 100644 (file)
@@ -10,7 +10,7 @@
     <graphs>true</graphs>\r
   </diagnostics>\r
   <consumers>\r
-    <buffer-depth>3</buffer-depth>\r
+    <buffer-depth>5</buffer-depth>\r
   </consumers>\r
   <mixers>\r
     <blend-modes>false</blend-modes>\r
     </producers>\r
     <channels>\r
       <channel>\r
-        <video-mode>1080p5000</video-mode>\r
+        <video-mode>PAL</video-mode>\r
         <consumers>\r
           <decklink>\r
             <device>1</device>\r
             <embedded-audio>true</embedded-audio>\r
+            <low-latency>true</low-latency>\r
           </decklink>\r
-          <screen></screen>\r
+          <audio></audio>\r
         </consumers>\r
       </channel>\r
     </channels>\r