</ItemDefinitionGroup>\r
<ItemGroup>\r
<ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
+ <ClInclude Include="concrt\bounded_buffer.h" />\r
+ <ClInclude Include="concrt\scoped_oversubscription_token.h" />\r
<ClInclude Include="concurrency\com_context.h" />\r
<ClInclude Include="concurrency\executor.h" />\r
<ClInclude Include="diagnostics\graph.h" />\r
<Filter Include="source\compiler\vs">\r
<UniqueIdentifier>{28c25c8a-1277-4d2c-9e85-5af33f9938ea}</UniqueIdentifier>\r
</Filter>\r
+ <Filter Include="source\concrt">\r
+ <UniqueIdentifier>{d3ba93c7-82fb-4e69-97bc-cdbf7d6feb24}</UniqueIdentifier>\r
+ </Filter>\r
</ItemGroup>\r
<ItemGroup>\r
<ClCompile Include="exception\win32_exception.cpp">\r
<ClInclude Include="utility\move_on_copy.h">\r
<Filter>source\utility</Filter>\r
</ClInclude>\r
+ <ClInclude Include="concrt\bounded_buffer.h">\r
+ <Filter>source\concrt</Filter>\r
+ </ClInclude>\r
+ <ClInclude Include="concrt\scoped_oversubscription_token.h">\r
+ <Filter>source\concrt</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
--- /dev/null
+#pragma once\r
+\r
+#include <agents.h>\r
+\r
+namespace Concurrency\r
+{\r
+\r
+/// <summary>\r
+/// A bounded_buffer implementation. Once the capacity is reached it will save the offered message\r
+/// id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume\r
+/// any of the postponed messages. Preference is given to previously offered messages before new ones.\r
+///\r
+/// NOTE: this bounded_buffer implementation contains code that is very unique to this particular block. \r
+/// Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer\r
+/// implementation uses a critical_section, several interlocked operations, and additional calls to async_send.\r
+/// These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem.\r
+/// </summary>\r
+/// <typeparam name="_Type">\r
+/// The payload type of messages stored and propagated by the buffer.\r
+/// </typeparam>\r
+template<class _Type>\r
+class bounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
+{\r
+public:\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ bounded_buffer(const size_t capacity)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, filter_method const& _Filter)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target();\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, Scheduler& _PScheduler)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter) \r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Cleans up any resources that may have been used by the bounded_buffer.\r
+ /// </summary>\r
+ ~bounded_buffer()\r
+ {\r
+ // Remove all links\r
+ remove_network_links();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Add an item to the bounded_buffer.\r
+ /// </summary>\r
+ /// <param name="_Item">\r
+ /// A reference to the item to add.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A boolean indicating whether the data was accepted.\r
+ /// </returns>\r
+ bool enqueue(_Type const& _Item)\r
+ {\r
+ return Concurrency::send<_Type>(this, _Item);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Remove an item from the bounded_buffer.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// The message payload.\r
+ /// </returns>\r
+ _Type dequeue()\r
+ {\r
+ return receive<_Type>(this);\r
+ }\r
+\r
+protected:\r
+\r
+ /// <summary>\r
+ /// The main propagate() function for ITarget blocks. Called by a source\r
+ /// block, generally within an asynchronous task to send messages to its targets.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message.\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// It is important that calls to propagate do *not* take the same lock on the\r
+ /// internal structure that is used by Consume and the LWT. Doing so could\r
+ /// result in a deadlock with the Consume call. (in the case of the bounded_buffer,\r
+ /// this lock is the m_internalLock)\r
+ /// </remarks>\r
+ virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ message_status _Result = accepted;\r
+ \r
+ // Check current capacity. \r
+ if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+ {\r
+ // Postpone the message, buffer is full.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = postponed;\r
+\r
+ // Save off the message id from this source to later try\r
+ // and reserve/consume when more space is free.\r
+ {\r
+ critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+ _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
+ }\r
+\r
+ async_send(NULL);\r
+ }\r
+ else\r
+ {\r
+ //\r
+ // Accept the message being propagated\r
+ // Note: depending on the source block propagating the message\r
+ // this may not necessarily be the same message (pMessage) first\r
+ // passed into the function.\r
+ //\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ async_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ // Didn't get a message so need to decrement.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = missed;\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ return _Result;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Synchronously sends a message to this block. When this function completes the message will\r
+ /// already have propagated into the block.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message.\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ message_status _Result = accepted;\r
+ \r
+ // Check current capacity. \r
+ if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+ {\r
+ // Postpone the message, buffer is full.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = postponed;\r
+\r
+ // Save off the message id from this source to later try\r
+ // and reserve/consume when more space is free.\r
+ {\r
+ critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+ _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
+ }\r
+\r
+ async_send(NULL);\r
+ }\r
+ else\r
+ {\r
+ //\r
+ // Accept the message being propagated\r
+ // Note: depending on the source block propagating the message\r
+ // this may not necessarily be the same message (pMessage) first\r
+ // passed into the function.\r
+ //\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ async_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ // Didn't get a message so need to decrement.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = missed;\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ return _Result;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Accepts an offered message by the source, transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
+ {\r
+ //\r
+ // Peek at the head message in the message buffer. If the Ids match\r
+ // dequeue and transfer ownership\r
+ //\r
+ message<_Type> * _Msg = NULL;\r
+\r
+ if (_M_messageBuffer._Is_head(_MsgId))\r
+ {\r
+ _Msg = _M_messageBuffer._Dequeue();\r
+\r
+ // Give preference to any previously postponed messages\r
+ // before decrementing current size.\r
+ if(!try_consume_msg())\r
+ {\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ }\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Try to reserve and consume a message from list of saved message ids.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// True if a message was sucessfully consumed, false otherwise.\r
+ /// </returns>\r
+ bool try_consume_msg()\r
+ {\r
+ runtime_object_identity _ReservedId = -1;\r
+ ISource<_Type> * _PSource = NULL;\r
+\r
+ // Walk through source links seeing if any saved ids exist.\r
+ bool _ConsumedMsg = true;\r
+ while(_ConsumedMsg)\r
+ {\r
+ source_iterator _Iter = _M_connectedSources.begin();\r
+ {\r
+ critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+ for (; *_Iter != NULL; ++_Iter)\r
+ {\r
+ _PSource = *_Iter;\r
+ std::map<ISource<_Type> *, runtime_object_identity>::iterator _MapIter;\r
+ if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end())\r
+ {\r
+ _ReservedId = _MapIter->second;\r
+ _M_savedSourceMsgIds.erase(_MapIter);\r
+ break;\r
+ }\r
+ }\r
+ }\r
+\r
+ // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster.\r
+ if(_ReservedId != -1)\r
+ {\r
+ if(_PSource->reserve(_ReservedId, this))\r
+ {\r
+ message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this);\r
+ async_send(_ConsumedMsg);\r
+ return true;\r
+ }\r
+ // Reserve failed go or link was removed, \r
+ // go back and try and find a different msg id.\r
+ else\r
+ {\r
+ continue;\r
+ }\r
+ }\r
+\r
+ // If this point is reached the map of source ids was empty.\r
+ break;\r
+ }\r
+\r
+ return false;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Reserves a message previously offered by the source.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A Boolean indicating whether the reservation worked or not.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// After 'reserve' is called, either 'consume' or 'release' must be called.\r
+ /// </remarks>\r
+ virtual bool reserve_message(runtime_object_identity _MsgId)\r
+ {\r
+ // Allow reservation if this is the head message\r
+ return _M_messageBuffer._Is_head(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Consumes a message that was reserved previously.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// Similar to 'accept', but is always preceded by a call to 'reserve'.\r
+ /// </remarks>\r
+ virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
+ {\r
+ // By default, accept the message\r
+ return accept_message(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Releases a previous message reservation.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ virtual void release_message(runtime_object_identity _MsgId)\r
+ {\r
+ // The head message is the one reserved.\r
+ if (!_M_messageBuffer._Is_head(_MsgId))\r
+ {\r
+ throw message_not_found();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Resumes propagation after a reservation has been released\r
+ /// </summary>\r
+ virtual void resume_propagation()\r
+ {\r
+ // If there are any messages in the buffer, propagate them out\r
+ if (_M_messageBuffer._Count() > 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Notification that a target was linked to this source.\r
+ /// </summary>\r
+ /// <param name="_PTarget">\r
+ /// A pointer to the newly linked target.\r
+ /// </param>\r
+ virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
+ {\r
+ // If the message queue is blocked due to reservation\r
+ // there is no need to do any message propagation\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ message<_Type> * _Msg = _M_messageBuffer._Peek();\r
+\r
+ if (_Msg != NULL)\r
+ {\r
+ // Propagate the head message to the new target\r
+ message_status _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ if (_Status == accepted)\r
+ {\r
+ // The target accepted the message, restart propagation.\r
+ propagate_to_any_targets(NULL);\r
+ }\r
+\r
+ // If the status is anything other than accepted, then leave\r
+ // the message queue blocked.\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Takes the message and propagates it to all the targets of this bounded_buffer.\r
+ /// This is called from async_send.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to a new message.\r
+ /// </param>\r
+ virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
+ {\r
+ // Enqueue pMessage to the internal message buffer if it is non-NULL.\r
+ // pMessage can be NULL if this LWT was the result of a Repropagate call\r
+ // out of a Consume or Release (where no new message is queued up, but\r
+ // everything remaining in the bounded buffer needs to be propagated out)\r
+ if (_PMessage != NULL)\r
+ {\r
+ _M_messageBuffer._Enqueue(_PMessage);\r
+\r
+ // If the incoming pMessage is not the head message, we can safely assume that\r
+ // the head message is blocked and waiting on Consume(), Release() or a new\r
+ // link_target() and cannot be propagated out.\r
+ if (_M_messageBuffer._Is_head(_PMessage->msg_id()))\r
+ {\r
+ _Propagate_priority_order();\r
+ }\r
+ }\r
+ else\r
+ {\r
+ // While current size is less than capacity try to consume\r
+ // any previously offered ids.\r
+ bool _ConsumedMsg = true;\r
+ while(_ConsumedMsg)\r
+ {\r
+ // Assume a message will be found to successfully consume in the\r
+ // saved ids, if not this will be decremented afterwards.\r
+ if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+ {\r
+ break;\r
+ }\r
+\r
+ _ConsumedMsg = try_consume_msg();\r
+ }\r
+\r
+ // Decrement the current size, we broke out of the previous loop\r
+ // because we reached capacity or there were no more messages to consume.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ }\r
+ }\r
+\r
+private:\r
+\r
+ /// <summary>\r
+ /// Attempts to propagate out any messages currently in the block.\r
+ /// </summary>\r
+ void _Propagate_priority_order()\r
+ {\r
+ message<_Target_type> * _Msg = _M_messageBuffer._Peek();\r
+\r
+ // If someone has reserved the _Head message, don't propagate anymore\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ while (_Msg != NULL)\r
+ {\r
+ message_status _Status = declined;\r
+\r
+ // Always start from the first target that linked.\r
+ for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
+ {\r
+ ITarget<_Target_type> * _PTarget = *_Iter;\r
+ _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ // Ownership of message changed. Do not propagate this\r
+ // message to any other target.\r
+ if (_Status == accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // If the target just propagated to reserved this message, stop\r
+ // propagating it to others.\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ break;\r
+ }\r
+ }\r
+\r
+ // If status is anything other than accepted, then the head message\r
+ // was not propagated out. Thus, nothing after it in the queue can\r
+ // be propagated out. Cease propagation.\r
+ if (_Status != accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // Get the next message\r
+ _Msg = _M_messageBuffer._Peek();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Message buffer used to store messages.\r
+ /// </summary>\r
+ ::Concurrency::details::_Queue<message<_Type>> _M_messageBuffer;\r
+\r
+ /// <summary>\r
+ /// Maximum number of messages bounded_buffer can hold.\r
+ /// </summary>\r
+ const size_t _M_capacity;\r
+\r
+ /// <summary>\r
+ /// Current number of messages in bounded_buffer.\r
+ /// </summary>\r
+ volatile long _M_currentSize;\r
+\r
+ /// <summary>\r
+ /// Lock used to guard saved message ids map.\r
+ /// </summary>\r
+ critical_section _M_savedIdsLock;\r
+\r
+ /// <summary>\r
+ /// Map of source links to saved message ids.\r
+ /// </summary>\r
+ std::map<ISource<_Type> *, runtime_object_identity> _M_savedSourceMsgIds;\r
+\r
+ //\r
+ // Hide assignment operator and copy constructor\r
+ //\r
+ bounded_buffer const &operator =(bounded_buffer const&); // no assignment operator\r
+ bounded_buffer(bounded_buffer const &); // no copy constructor\r
+};\r
+}
\ No newline at end of file
--- /dev/null
+#pragma once\r
+\r
+#include <agents.h>\r
+\r
+namespace Concurrency {\r
+\r
+/// <summary>\r
+/// An RAII style wrapper around Concurrency::Context::Oversubscribe,\r
+/// useful for annotating known blocking calls\r
+/// </summary>\r
+class scoped_oversubcription_token\r
+{\r
+public:\r
+ scoped_oversubcription_token()\r
+ {\r
+ Concurrency::Context::CurrentContext()->Oversubscribe(true);\r
+ }\r
+ ~scoped_oversubcription_token()\r
+ {\r
+ Concurrency::Context::CurrentContext()->Oversubscribe(false);\r
+ }\r
+};\r
+\r
+}
\ No newline at end of file
\r
implementation(const printer& parent_printer) \r
: parent_printer_(parent_printer)\r
- , name_(parent_printer_ ? narrow(parent_printer_()) : "")\r
+ , name_("")\r
, counter_(0){}\r
\r
void update(const std::string& name, double value)\r
implementation& operator=(implementation&);\r
};\r
\r
-graph::graph(const std::string& name) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(name) : nullptr)\r
+graph::graph(const std::string& name, bool start) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(name) : nullptr)\r
{\r
- if(impl_)\r
- context::register_drawable(impl_);\r
+ if(start)\r
+ graph::start();\r
}\r
\r
-graph::graph(const printer& parent_printer) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(parent_printer) : nullptr)\r
+graph::graph(const printer& parent_printer, bool start) : impl_(env::properties().get("configuration.diagnostics.graphs", true) ? new implementation(parent_printer) : nullptr)\r
{\r
+ if(start)\r
+ graph::start();\r
+}\r
+\r
+void graph::start()\r
+{ \r
if(impl_)\r
context::register_drawable(impl_);\r
}\r
}\r
}\r
\r
-safe_ptr<graph> create_graph(const std::string& name)\r
+safe_ptr<graph> create_graph(const std::string& name, bool start)\r
{\r
- return safe_ptr<graph>(new graph(name));\r
+ return safe_ptr<graph>(new graph(name, start));\r
}\r
-safe_ptr<graph> create_graph(const printer& parent_printer)\r
+safe_ptr<graph> create_graph(const printer& parent_printer, bool start)\r
{\r
- return safe_ptr<graph>(new graph(parent_printer));\r
+ return safe_ptr<graph>(new graph(parent_printer, start));\r
}\r
\r
\r
\r
class graph\r
{\r
- friend safe_ptr<graph> create_graph(const std::string& name);\r
- friend safe_ptr<graph> create_graph(const printer& parent_printer);\r
- graph(const std::string& name);\r
- graph(const printer& parent_printer);\r
+ friend safe_ptr<graph> create_graph(const std::string& name, bool start);\r
+ friend safe_ptr<graph> create_graph(const printer& parent_printer, bool start);\r
+ graph(const std::string& name, bool start = true);\r
+ graph(const printer& parent_printer, bool start = true);\r
public:\r
+ void start();\r
void update_value(const std::string& name, double value);\r
void set_value(const std::string& name, double value);\r
void set_color(const std::string& name, color c);\r
std::shared_ptr<implementation> impl_;\r
};\r
\r
-safe_ptr<graph> create_graph(const std::string& name);\r
-safe_ptr<graph> create_graph(const printer& parent_printer);\r
+safe_ptr<graph> create_graph(const std::string& name, bool start = true);\r
+safe_ptr<graph> create_graph(const printer& parent_printer, bool start = true);\r
\r
//namespace v2\r
//{\r
{\r
return make_safe<write_frame>(ogl_, tag, desc);\r
}\r
+\r
+ boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* tag, const pixel_format_desc& desc)\r
+ {\r
+ return ogl_.begin_invoke([=]{return make_safe<write_frame>(ogl_, tag, desc);}, high_priority);\r
+ }\r
};\r
\r
image_mixer::image_mixer(ogl_device& ogl, const video_format_desc& format_desc) : impl_(new implementation(ogl, format_desc)){}\r
void image_mixer::end(){impl_->end();}\r
boost::unique_future<safe_ptr<host_buffer>> image_mixer::render(){return impl_->render();}\r
safe_ptr<write_frame> image_mixer::create_frame(const void* tag, const pixel_format_desc& desc){return impl_->create_frame(tag, desc);}\r
+boost::unique_future<safe_ptr<write_frame>> image_mixer::create_frame2(const void* tag, const pixel_format_desc& desc){return impl_->create_frame2(tag, desc);}\r
void image_mixer::begin_layer(blend_mode::type blend_mode){impl_->begin_layer(blend_mode);}\r
void image_mixer::end_layer(){impl_->end_layer();}\r
image_mixer& image_mixer::operator=(image_mixer&& other)\r
#include <core/producer/frame/frame_visitor.h>\r
\r
#include <boost/noncopyable.hpp>\r
-\r
#include <boost/thread/future.hpp>\r
\r
namespace caspar { namespace core {\r
boost::unique_future<safe_ptr<host_buffer>> render();\r
\r
safe_ptr<write_frame> create_frame(const void* tag, const pixel_format_desc& format);\r
+ boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* tag, const pixel_format_desc& format);\r
\r
private:\r
struct implementation;\r
{ \r
return image_mixer_.create_frame(tag, desc);\r
}\r
+\r
+ boost::unique_future<safe_ptr<core::write_frame>> create_frame2(const void* tag, const core::pixel_format_desc& desc)\r
+ { \r
+ return image_mixer_.create_frame2(tag, desc);\r
+ }\r
\r
void set_transform(int index, const frame_transform& transform, unsigned int mix_duration, const std::wstring& tween)\r
{\r
safe_ptr<core::read_frame> mixer::execute(const std::map<int, safe_ptr<core::basic_frame>>& frames){ return impl_->execute(frames);}\r
core::video_format_desc mixer::get_video_format_desc() const { return impl_->channel_.get_format_desc(); }\r
safe_ptr<core::write_frame> mixer::create_frame(const void* tag, const core::pixel_format_desc& desc){ return impl_->create_frame(tag, desc); } \r
-safe_ptr<core::write_frame> mixer::create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt)\r
-{\r
- // Create bgra frame\r
- core::pixel_format_desc desc;\r
- desc.pix_fmt = pix_fmt;\r
- desc.planes.push_back( core::pixel_format_desc::plane(width, height, 4));\r
- return create_frame(tag, desc);\r
-}\r
+boost::unique_future<safe_ptr<write_frame>> mixer::create_frame2(const void* video_stream_tag, const pixel_format_desc& desc){ return impl_->create_frame2(video_stream_tag, desc); } \r
void mixer::set_frame_transform(int index, const core::frame_transform& transform, unsigned int mix_duration, const std::wstring& tween){impl_->set_transform(index, transform, mix_duration, tween);}\r
void mixer::apply_frame_transform(int index, const std::function<core::frame_transform(core::frame_transform)>& transform, unsigned int mix_duration, const std::wstring& tween){impl_->apply_transform(index, transform, mix_duration, tween);}\r
void mixer::clear_transforms(){impl_->clear_transforms();}\r
\r
safe_ptr<core::read_frame> execute(const std::map<int, safe_ptr<core::basic_frame>>& frames); // nothrow\r
\r
- safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc); \r
- safe_ptr<core::write_frame> create_frame(const void* tag, size_t width, size_t height, core::pixel_format::type pix_fmt = core::pixel_format::bgra); \r
- \r
+ safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc); \r
+ boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc);\r
+\r
core::video_format_desc get_video_format_desc() const; // nothrow\r
\r
\r
#pragma once\r
\r
#include "pixel_format.h"\r
+#include "../../video_format.h"\r
\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concrt/scoped_oversubscription_token.h>\r
+\r
+#include <concrt.h>\r
\r
#include <boost/noncopyable.hpp>\r
+#include <boost/thread/future.hpp>\r
\r
namespace caspar { namespace core {\r
\r
class write_frame;\r
-struct pixel_format_desc;\r
struct video_format_desc;\r
\r
struct frame_factory : boost::noncopyable\r
{\r
virtual safe_ptr<write_frame> create_frame(const void* video_stream_tag, const pixel_format_desc& desc) = 0;\r
- virtual safe_ptr<write_frame> create_frame(const void* video_stream_tag, size_t width, size_t height, pixel_format::type pix_fmt = pixel_format::bgra) = 0; \r
+ virtual safe_ptr<write_frame> create_frame(const void* video_stream_tag, size_t width, size_t height, pixel_format::type pix_fmt = pixel_format::bgra)\r
+ { \r
+ // Create bgra frame\r
+ core::pixel_format_desc desc;\r
+ desc.pix_fmt = pix_fmt;\r
+ desc.planes.push_back( core::pixel_format_desc::plane(width, height, 4));\r
+ return create_frame(video_stream_tag, desc);\r
+ }\r
\r
+ virtual boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc) = 0;\r
+\r
virtual video_format_desc get_video_format_desc() const = 0; // nothrow\r
};\r
+ \r
+class concrt_frame_factory : public frame_factory\r
+{ \r
+ safe_ptr<frame_factory> factory_;\r
+public:\r
+ concrt_frame_factory(const safe_ptr<frame_factory>& factory) \r
+ : factory_(factory)\r
+ {\r
+ }\r
+ \r
+ virtual safe_ptr<write_frame> create_frame(const void* tag, const pixel_format_desc& desc)\r
+ {\r
+ auto frame = factory_->create_frame2(tag, desc);\r
+\r
+ Concurrency::wait(0);\r
+\r
+ if(!frame.has_value())\r
+ {\r
+ Concurrency::scoped_oversubcription_token oversubscribe;\r
+ frame.wait();\r
+ }\r
+ return frame.get();\r
+ }\r
+\r
+ virtual boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc)\r
+ {\r
+ return factory_->create_frame2(video_stream_tag, desc);\r
+ }\r
+\r
+ virtual video_format_desc get_video_format_desc() const\r
+ {\r
+ return factory_->get_video_format_desc();\r
+ }\r
+};\r
\r
}}
\ No newline at end of file
\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Wed Sep 21 22:02:53 2011\r
+/* at Tue Oct 18 00:55:44 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
\r
\r
/* File created by MIDL compiler version 7.00.0555 */\r
-/* at Wed Sep 21 22:02:53 2011\r
+/* at Tue Oct 18 00:55:44 2011\r
*/\r
/* Compiler settings for interop\DeckLinkAPI.idl:\r
Oicf, W1, Zp8, env=Win32 (32b run), target_arch=X86 7.00.0555 \r
#include <string>\r
#include <math.h>\r
\r
-#include <tbb/atomic.h>\r
-#include <tbb/concurrent_queue.h>\r
-#include <tbb/parallel_for.h>\r
+#include <agents.h>\r
+#include <ppl.h>\r
\r
#include <boost/assign.hpp>\r
#include <boost/filesystem.hpp>\r
#include "../../stdafx.h"\r
\r
#include "audio_decoder.h"\r
-\r
#include "audio_resampler.h"\r
\r
+#include "../util.h"\r
#include "../../ffmpeg_error.h"\r
\r
#include <core/video_format.h>\r
\r
#include <tbb/cache_aligned_allocator.h>\r
\r
-#include <queue>\r
-\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (disable : 4244)\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct audio_decoder::implementation : boost::noncopyable\r
-{ \r
+struct audio_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
+{\r
+ audio_decoder::token_t& active_token_;\r
+ audio_decoder::source_t& source_;\r
+ audio_decoder::target_t& target_;\r
+\r
std::shared_ptr<AVCodecContext> codec_context_; \r
const core::video_format_desc format_desc_;\r
int index_;\r
std::unique_ptr<audio_resampler> resampler_;\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
-\r
- std::queue<std::shared_ptr<AVPacket>> packets_;\r
-\r
+ \r
int64_t nb_frames_;\r
public:\r
- explicit implementation(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) \r
- : format_desc_(format_desc) \r
+ explicit implementation(audio_decoder::token_t& active_token,\r
+ audio_decoder::source_t& source,\r
+ audio_decoder::target_t& target,\r
+ const safe_ptr<AVFormatContext>& context, \r
+ const core::video_format_desc& format_desc) \r
+ : active_token_(active_token)\r
+ , source_(source)\r
+ , target_(target)\r
+ , format_desc_(format_desc) \r
, nb_frames_(0)\r
{ \r
try\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
CASPAR_LOG(warning) << "[audio_decoder] Failed to open audio-stream. Running without audio."; \r
}\r
- }\r
\r
- void push(const std::shared_ptr<AVPacket>& packet)\r
- { \r
- if(packet && packet->stream_index != index_)\r
- return;\r
+ start();\r
+ }\r
\r
- packets_.push(packet);\r
- } \r
- \r
- std::vector<std::shared_ptr<core::audio_buffer>> poll()\r
+ ~implementation()\r
{\r
- std::vector<std::shared_ptr<core::audio_buffer>> result;\r
-\r
- if(packets_.empty())\r
- return result;\r
-\r
- if(!codec_context_)\r
- return empty_poll();\r
- \r
- auto packet = packets_.front();\r
+ agent::wait(this);\r
+ } \r
\r
- if(packet) \r
+ virtual void run()\r
+ {\r
+ try\r
{\r
- result.push_back(decode(*packet));\r
- if(packet->size == 0) \r
- packets_.pop();\r
+ while(Concurrency::receive(active_token_))\r
+ {\r
+ auto packet = Concurrency::receive(source_);\r
+ if(packet == eof_packet())\r
+ {\r
+ Concurrency::send(target_, eof_audio());\r
+ break;\r
+ }\r
+\r
+ if(packet == loop_packet()) \r
+ { \r
+ if(codec_context_)\r
+ avcodec_flush_buffers(codec_context_.get());\r
+ Concurrency::send(target_, loop_audio());\r
+ } \r
+ else if(!codec_context_)\r
+ Concurrency::send(target_, empty_audio()); \r
+ else \r
+ Concurrency::send(target_, decode(*packet)); \r
+ }\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
- else \r
- { \r
- avcodec_flush_buffers(codec_context_.get());\r
- result.push_back(nullptr);\r
- packets_.pop();\r
- } \r
-\r
- return result;\r
- }\r
\r
- std::vector<std::shared_ptr<core::audio_buffer>> empty_poll()\r
- {\r
- auto packet = packets_.front();\r
- packets_.pop();\r
+ std::shared_ptr<AVPacket> packet;\r
+ Concurrency::try_receive(source_, packet); \r
\r
- if(!packet) \r
- return boost::assign::list_of(nullptr);\r
- \r
- return boost::assign::list_of(std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0)); \r
+ done();\r
}\r
-\r
+ \r
std::shared_ptr<core::audio_buffer> decode(AVPacket& pkt)\r
{ \r
buffer1_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
\r
return std::make_shared<core::audio_buffer>(samples, samples + n_samples);\r
}\r
-\r
- bool ready() const\r
- {\r
- return !packets_.empty();\r
- }\r
};\r
\r
-audio_decoder::audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc) : impl_(new implementation(context, format_desc)){}\r
-void audio_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
-bool audio_decoder::ready() const{return impl_->ready();}\r
-std::vector<std::shared_ptr<core::audio_buffer>> audio_decoder::poll(){return impl_->poll();}\r
+audio_decoder::audio_decoder(token_t& active_token,\r
+ source_t& source,\r
+ target_t& target,\r
+ const safe_ptr<AVFormatContext>& context, \r
+ const core::video_format_desc& format_desc)\r
+ : impl_(new implementation(active_token, source, target, context, format_desc))\r
+{\r
+}\r
int64_t audio_decoder::nb_frames() const{return impl_->nb_frames_;}\r
\r
}}
\ No newline at end of file
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
#include <vector>\r
\r
struct AVPacket;\r
class audio_decoder : boost::noncopyable\r
{\r
public:\r
- explicit audio_decoder(const safe_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
- \r
- void push(const std::shared_ptr<AVPacket>& packet);\r
- bool ready() const;\r
- std::vector<std::shared_ptr<core::audio_buffer>> poll();\r
\r
+ typedef Concurrency::ISource<bool> token_t;\r
+ typedef Concurrency::ISource<std::shared_ptr<AVPacket>> source_t;\r
+ typedef Concurrency::ITarget<std::shared_ptr<core::audio_buffer>> target_t;\r
+\r
+ explicit audio_decoder(token_t& active_token,\r
+ source_t& source,\r
+ target_t& target,\r
+ const safe_ptr<AVFormatContext>& context, \r
+ const core::video_format_desc& format_desc);\r
+ \r
int64_t nb_frames() const;\r
\r
private:\r
+ \r
struct implementation;\r
safe_ptr<implementation> impl_;\r
};\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> resample(std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>>&& data)\r
{\r
- if(resampler_)\r
+ if(resampler_ && !data.empty())\r
{\r
buffer2_.resize(AVCODEC_MAX_AUDIO_FRAME_SIZE*2);\r
auto ret = audio_resample(resampler_.get(),\r
\r
#include <libavutil/samplefmt.h>\r
\r
+#include <tbb/cache_aligned_allocator.h>\r
+\r
namespace caspar { namespace ffmpeg {\r
\r
class audio_resampler\r
#include "audio/audio_decoder.h"\r
#include "video/video_decoder.h"\r
\r
+#include <common/concrt/bounded_buffer.h>\r
#include <common/env.h>\r
#include <common/utility/assert.h>\r
#include <common/diagnostics/graph.h>\r
#include <boost/range/algorithm/find_if.hpp>\r
#include <boost/range/algorithm/find.hpp>\r
\r
-#include <tbb/parallel_invoke.h>\r
+#include <agents.h>\r
+#include <ppl.h>\r
\r
namespace caspar { namespace ffmpeg {\r
+\r
+template<typename T>\r
+struct buffer_alias\r
+{\r
+ typedef Concurrency::bounded_buffer<std::shared_ptr<T>> type;\r
+};\r
\r
struct ffmpeg_producer : public core::frame_producer\r
-{\r
- const std::wstring filename_;\r
- \r
- const safe_ptr<diagnostics::graph> graph_;\r
- boost::timer frame_timer_;\r
- boost::timer video_timer_;\r
- boost::timer audio_timer_;\r
+{ \r
+ const std::wstring filename_;\r
+ const int start_;\r
+ const bool loop_;\r
+ const size_t length_;\r
+\r
+ buffer_alias<AVPacket>::type video_packets_;\r
+ buffer_alias<AVPacket>::type audio_packets_;\r
+ buffer_alias<AVFrame>::type video_frames_;\r
+ buffer_alias<core::audio_buffer>::type audio_buffers_;\r
+ buffer_alias<core::basic_frame>::type muxed_frames_;\r
+ Concurrency::overwrite_buffer<bool> active_token_;\r
+ \r
+ const safe_ptr<diagnostics::graph> graph_;\r
\r
- const safe_ptr<core::frame_factory> frame_factory_;\r
- const core::video_format_desc format_desc_;\r
-\r
- input input_; \r
- video_decoder video_decoder_;\r
- audio_decoder audio_decoder_; \r
- double fps_;\r
- frame_muxer muxer_;\r
-\r
- const int start_;\r
- const bool loop_;\r
- const size_t length_;\r
+ input input_; \r
+ video_decoder video_decoder_;\r
+ audio_decoder audio_decoder_; \r
+ frame_muxer2 muxer_;\r
\r
- safe_ptr<core::basic_frame> last_frame_;\r
-\r
- const size_t width_;\r
- const size_t height_;\r
- bool is_progressive_;\r
+ safe_ptr<core::basic_frame> last_frame_;\r
\r
public:\r
explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
: filename_(filename)\r
- , graph_(diagnostics::create_graph([this]{return print();}))\r
- , frame_factory_(frame_factory) \r
- , format_desc_(frame_factory->get_video_format_desc())\r
- , input_(graph_, filename_, loop, start, length)\r
- , video_decoder_(input_.context(), frame_factory, filter)\r
- , audio_decoder_(input_.context(), frame_factory->get_video_format_desc())\r
- , fps_(video_decoder_.fps())\r
- , muxer_(fps_, frame_factory)\r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
+ , video_packets_(25)\r
+ , audio_packets_(25)\r
+ , video_frames_(2)\r
+ , audio_buffers_(2)\r
+ , muxed_frames_(2)\r
+ , graph_(diagnostics::create_graph([this]{return print();}, false))\r
+ , input_(active_token_, video_packets_, audio_packets_, graph_, filename_, loop, start, length)\r
+ , video_decoder_(active_token_, video_packets_, video_frames_, input_.context(), frame_factory->get_video_format_desc().fps, filter)\r
+ , audio_decoder_(active_token_, audio_packets_, audio_buffers_, input_.context(), frame_factory->get_video_format_desc())\r
+ , muxer_(active_token_, video_frames_, audio_buffers_, muxed_frames_, video_decoder_.fps(), frame_factory)\r
, last_frame_(core::basic_frame::empty())\r
- , width_(video_decoder_.width())\r
- , height_(video_decoder_.height())\r
- , is_progressive_(true)\r
{\r
- graph_->add_guide("frame-time", 0.5);\r
- graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));\r
graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f)); \r
- \r
- // Do some pre-work in order to not block rendering thread for initialization and allocations.\r
+ graph_->start();\r
\r
- push_packets();\r
- auto video_frames = video_decoder_.poll();\r
- if(!video_frames.empty())\r
- {\r
- auto& video_frame = video_frames.front();\r
- auto desc = get_pixel_format_desc(static_cast<PixelFormat>(video_frame->format), video_frame->width, video_frame->height);\r
- if(desc.pix_fmt == core::pixel_format::invalid)\r
- get_pixel_format_desc(PIX_FMT_BGRA, video_frame->width, video_frame->height);\r
- \r
- for(int n = 0; n < 3; ++n)\r
- frame_factory->create_frame(this, desc);\r
- }\r
- BOOST_FOREACH(auto& video, video_frames) \r
- muxer_.push(video, 0); \r
+ Concurrency::send(active_token_, true);\r
+ }\r
+\r
+ ~ffmpeg_producer()\r
+ {\r
+ Concurrency::send(active_token_, false);\r
+ std::shared_ptr<core::basic_frame> frame;\r
+ Concurrency::try_receive(muxed_frames_, frame);\r
}\r
- \r
+ \r
virtual safe_ptr<core::basic_frame> receive(int hints)\r
{\r
auto frame = core::basic_frame::late();\r
\r
- frame_timer_.restart();\r
- \r
- for(int n = 0; n < 64 && muxer_.empty(); ++n)\r
- decode_frame(hints);\r
- \r
- graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
-\r
- if(!muxer_.empty())\r
- frame = last_frame_ = muxer_.pop(); \r
- else\r
- {\r
- if(input_.eof())\r
- return core::basic_frame::eof();\r
- else \r
- graph_->add_tag("underflow"); \r
+ try\r
+ { \r
+ frame = last_frame_ = safe_ptr<core::basic_frame>(Concurrency::receive(muxed_frames_, 8));\r
}\r
- \r
+ catch(Concurrency::operation_timed_out&)\r
+ { \r
+ graph_->add_tag("underflow"); \r
+ }\r
+\r
return frame;\r
}\r
\r
{\r
return disable_audio(last_frame_);\r
}\r
-\r
- void push_packets()\r
- {\r
- for(int n = 0; n < 16 && ((!muxer_.video_ready() && !video_decoder_.ready()) || (!muxer_.audio_ready() && !audio_decoder_.ready())); ++n) \r
- {\r
- std::shared_ptr<AVPacket> pkt;\r
- if(input_.try_pop(pkt))\r
- {\r
- video_decoder_.push(pkt);\r
- audio_decoder_.push(pkt);\r
- }\r
- }\r
- }\r
-\r
- void decode_frame(int hints)\r
- {\r
- push_packets();\r
- \r
- tbb::parallel_invoke(\r
- [&]\r
- {\r
- if(muxer_.video_ready())\r
- return;\r
-\r
- auto video_frames = video_decoder_.poll();\r
- BOOST_FOREACH(auto& video, video_frames) \r
- {\r
- is_progressive_ = video ? video->interlaced_frame == 0 : is_progressive_;\r
- muxer_.push(video, hints); \r
- }\r
- },\r
- [&]\r
- {\r
- if(muxer_.audio_ready())\r
- return;\r
- \r
- auto audio_samples = audio_decoder_.poll();\r
- BOOST_FOREACH(auto& audio, audio_samples)\r
- muxer_.push(audio); \r
- });\r
-\r
- muxer_.commit();\r
- }\r
-\r
- virtual int64_t nb_frames() const \r
+ \r
+ virtual int64_t nb_frames() const\r
{\r
if(loop_)\r
return std::numeric_limits<int64_t>::max();\r
virtual std::wstring print() const\r
{\r
return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"|" \r
- + boost::lexical_cast<std::wstring>(width_) + L"x" + boost::lexical_cast<std::wstring>(height_)\r
- + (is_progressive_ ? L"p" : L"i") + boost::lexical_cast<std::wstring>(is_progressive_ ? fps_ : 2.0 * fps_)\r
+ + boost::lexical_cast<std::wstring>(video_decoder_.width()) + L"x" + boost::lexical_cast<std::wstring>(video_decoder_.height())\r
+ + (video_decoder_.is_progressive() ? L"p" : L"i") + boost::lexical_cast<std::wstring>(video_decoder_.is_progressive() ? video_decoder_.fps() : 2.0 * video_decoder_.fps())\r
+ L"]";\r
}\r
};\r
#pragma warning (pop)\r
#endif\r
\r
-#include <tbb/parallel_for.h>\r
-#include <tbb/concurrent_queue.h>\r
+#include <ppl.h>\r
+#include <concurrent_queue.h>\r
\r
#include <boost/thread/once.hpp>\r
\r
\r
if(ctx.index == ctx.last_index)\r
{ \r
- tbb::parallel_for(tbb::blocked_range<size_t>(0, ctx.index), [=](const tbb::blocked_range<size_t>& r)\r
+ Concurrency::parallel_for(0, ctx.index, [=](size_t n)\r
{\r
- for(auto n = r.begin(); n != r.end(); ++n)\r
- org_yadif_filter_line(ctx.args[n].dst, ctx.args[n].prev, ctx.args[n].cur, ctx.args[n].next, ctx.args[n].w, ctx.args[n].prefs, ctx.args[n].mrefs, ctx.args[n].parity, ctx.args[n].mode);\r
+ org_yadif_filter_line(ctx.args[n].dst, ctx.args[n].prev, ctx.args[n].cur, ctx.args[n].next, ctx.args[n].w, ctx.args[n].prefs, ctx.args[n].mrefs, ctx.args[n].parity, ctx.args[n].mode);\r
});\r
ctx.index = 0;\r
}\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-tbb::concurrent_bounded_queue<decltype(org_yadif_filter_line)> parallel_line_func_pool;\r
+Concurrency::concurrent_queue<decltype(org_yadif_filter_line)> parallel_line_func_pool;\r
std::array<parallel_yadif_context, 18> ctxs;\r
\r
#define RENAME(a) f ## a\r
#include <boost/foreach.hpp>\r
#include <boost/range/algorithm_ext/push_back.hpp>\r
\r
+#include <agents.h>\r
+#include <ppl.h>\r
+\r
#include <deque>\r
#include <queue>\r
#include <vector>\r
bool frame_muxer::audio_ready() const{return impl_->audio_ready();}\r
int64_t frame_muxer::calc_nb_frames(int64_t nb_frames) const {return impl_->calc_nb_frames(nb_frames);}\r
\r
+\r
+struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
+{ \r
+ frame_muxer2::token_t& active_token_;\r
+ frame_muxer2::video_source_t& video_source_;\r
+ frame_muxer2::audio_source_t& audio_source_;\r
+ frame_muxer2::target_t& target_;\r
+\r
+ std::deque<std::queue<safe_ptr<write_frame>>> video_streams_;\r
+ std::deque<core::audio_buffer> audio_streams_;\r
+ std::deque<safe_ptr<basic_frame>> frame_buffer_;\r
+ display_mode::type display_mode_;\r
+ const double in_fps_;\r
+ const video_format_desc format_desc_;\r
+ bool auto_transcode_;\r
+\r
+ size_t audio_sample_count_;\r
+ size_t video_frame_count_;\r
+ \r
+ size_t processed_audio_sample_count_;\r
+ size_t processed_video_frame_count_;\r
+\r
+ filter filter_;\r
+ safe_ptr<core::frame_factory> frame_factory_;\r
+ \r
+ implementation(frame_muxer2::token_t& active_token,\r
+ frame_muxer2::video_source_t& video_source,\r
+ frame_muxer2::audio_source_t& audio_source,\r
+ frame_muxer2::target_t& target,\r
+ double in_fps, \r
+ const safe_ptr<core::frame_factory>& frame_factory)\r
+ : active_token_(active_token)\r
+ , video_source_(video_source)\r
+ , audio_source_(audio_source)\r
+ , target_(target)\r
+ , video_streams_(1)\r
+ , audio_streams_(1)\r
+ , display_mode_(display_mode::invalid)\r
+ , in_fps_(in_fps)\r
+ , format_desc_(frame_factory->get_video_format_desc())\r
+ , auto_transcode_(env::properties().get("configuration.producers.auto-transcode", false))\r
+ , audio_sample_count_(0)\r
+ , video_frame_count_(0)\r
+ , frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
+ {\r
+ start();\r
+ }\r
+\r
+ ~implementation()\r
+ {\r
+ agent::wait(this);\r
+ }\r
+\r
+ virtual void run()\r
+ {\r
+ try\r
+ {\r
+ while(Concurrency::receive(active_token_))\r
+ {\r
+ Concurrency::parallel_invoke(\r
+ [&]\r
+ {\r
+ while(!video_ready())\r
+ {\r
+ auto video = Concurrency::receive(video_source_);\r
+ if(video == eof_video())\r
+ break;\r
+ push(video, 0); \r
+ }\r
+ },\r
+ [&]\r
+ {\r
+ while(!audio_ready())\r
+ {\r
+ auto audio = Concurrency::receive(audio_source_);\r
+ if(audio == eof_audio())\r
+ break;\r
+ push(audio); \r
+ } \r
+ });\r
+\r
+ if(!video_ready() || !audio_ready())\r
+ {\r
+ Concurrency::send(target_, std::shared_ptr<core::basic_frame>(core::basic_frame::eof()));\r
+ break;\r
+ }\r
+\r
+ commit();\r
+ \r
+ if(!frame_buffer_.empty())\r
+ {\r
+ Concurrency::send(target_, std::shared_ptr<core::basic_frame>(frame_buffer_.front()));\r
+ frame_buffer_.pop_front(); \r
+ }\r
+ }\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
+\r
+ std::shared_ptr<AVFrame> video;\r
+ Concurrency::try_receive(video_source_, video);\r
+ std::shared_ptr<core::audio_buffer> audio;\r
+ Concurrency::try_receive(audio_source_, audio);\r
+ \r
+ done();\r
+ }\r
+\r
+ void push(const std::shared_ptr<AVFrame>& video_frame, int hints)\r
+ { \r
+ if(video_frame == loop_video())\r
+ { \r
+ CASPAR_LOG(debug) << L"video-frame-count: " << static_cast<float>(video_frame_count_);\r
+ video_frame_count_ = 0;\r
+ video_streams_.push_back(std::queue<safe_ptr<write_frame>>());\r
+ return;\r
+ }\r
+\r
+ if(video_frame == empty_video())\r
+ {\r
+ video_streams_.back().push(make_safe<core::write_frame>(this));\r
+ ++video_frame_count_;\r
+ display_mode_ = display_mode::simple;\r
+ return;\r
+ }\r
+\r
+ if(display_mode_ == display_mode::invalid)\r
+ {\r
+ if(auto_transcode_)\r
+ {\r
+ auto in_mode = get_mode(*video_frame);\r
+ display_mode_ = get_display_mode(in_mode, in_fps_, format_desc_.field_mode, format_desc_.fps);\r
+ \r
+ if(display_mode_ == display_mode::simple && in_mode != core::field_mode::progressive && format_desc_.field_mode != core::field_mode::progressive && video_frame->height != static_cast<int>(format_desc_.height))\r
+ display_mode_ = display_mode::deinterlace_bob_reinterlace; // The frame will most likely be scaled, we need to deinterlace->reinterlace \r
+ \r
+ if(display_mode_ == display_mode::deinterlace)\r
+ filter_ = filter(L"YADIF=0:-1");\r
+ else if(display_mode_ == display_mode::deinterlace_bob || display_mode_ == display_mode::deinterlace_bob_reinterlace)\r
+ filter_ = filter(L"YADIF=1:-1");\r
+ }\r
+ else\r
+ display_mode_ = display_mode::simple;\r
+\r
+ if(display_mode_ == display_mode::invalid)\r
+ {\r
+ CASPAR_LOG(warning) << L"[frame_muxer] Failed to detect display-mode.";\r
+ display_mode_ = display_mode::simple;\r
+ }\r
+\r
+ CASPAR_LOG(info) << "[frame_muxer] " << display_mode::print(display_mode_);\r
+ }\r
+\r
+ \r
+ if(hints & core::frame_producer::ALPHA_HINT)\r
+ video_frame->format = make_alpha_format(video_frame->format);\r
+ \r
+ auto format = video_frame->format;\r
+ if(video_frame->format == CASPAR_PIX_FMT_LUMA) // CASPAR_PIX_FMT_LUMA is not valid for filter, change it to GRAY8\r
+ video_frame->format = PIX_FMT_GRAY8;\r
+\r
+ BOOST_FOREACH(auto& av_frame, filter_.execute(video_frame))\r
+ {\r
+ av_frame->format = format;\r
+\r
+ auto frame = make_write_frame(this, av_frame, frame_factory_, hints);\r
+\r
+ // Fix field-order if needed\r
+ if(frame->get_type() == core::field_mode::lower && format_desc_.field_mode == core::field_mode::upper)\r
+ frame->get_frame_transform().fill_translation[1] += 1.0/static_cast<double>(format_desc_.height);\r
+ else if(frame->get_type() == core::field_mode::upper && format_desc_.field_mode == core::field_mode::lower)\r
+ frame->get_frame_transform().fill_translation[1] -= 1.0/static_cast<double>(format_desc_.height);\r
+\r
+ video_streams_.back().push(frame);\r
+ ++video_frame_count_;\r
+ }\r
+\r
+ if(video_streams_.back().size() > 8)\r
+ BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("video-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+ }\r
+\r
+ void push(std::shared_ptr<core::audio_buffer> audio_samples)\r
+ {\r
+ if(audio_samples == loop_audio()) \r
+ {\r
+ CASPAR_LOG(debug) << L"audio-chunk-count: " << audio_sample_count_/format_desc_.audio_samples_per_frame;\r
+ audio_streams_.push_back(core::audio_buffer());\r
+ audio_sample_count_ = 0;\r
+ return;\r
+ }\r
+\r
+ if(audio_samples == empty_audio()) \r
+ audio_samples = std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame); \r
+\r
+ audio_sample_count_ += audio_samples->size();\r
+\r
+ boost::range::push_back(audio_streams_.back(), *audio_samples);\r
+\r
+ if(audio_streams_.back().size() > 8*format_desc_.audio_samples_per_frame)\r
+ BOOST_THROW_EXCEPTION(invalid_operation() << source_info("frame_muxer") << msg_info("audio-stream overflow. This can be caused by incorrect frame-rate. Check clip meta-data."));\r
+ }\r
+ \r
+ size_t size() const\r
+ {\r
+ return frame_buffer_.size();\r
+ }\r
+\r
+ safe_ptr<core::write_frame> pop_video()\r
+ {\r
+ auto frame = video_streams_.front().front();\r
+ video_streams_.front().pop();\r
+ \r
+ return frame;\r
+ }\r
+\r
+ core::audio_buffer pop_audio()\r
+ {\r
+ CASPAR_VERIFY(audio_streams_.front().size() >= format_desc_.audio_samples_per_frame);\r
+\r
+ auto begin = audio_streams_.front().begin();\r
+ auto end = begin + format_desc_.audio_samples_per_frame;\r
+\r
+ auto samples = core::audio_buffer(begin, end);\r
+ audio_streams_.front().erase(begin, end);\r
+\r
+ return samples;\r
+ }\r
+ \r
+ bool video_ready() const\r
+ { \r
+ return video_streams_.size() > 1 || (video_streams_.size() >= audio_streams_.size() && video_ready2());\r
+ }\r
+ \r
+ bool audio_ready() const\r
+ {\r
+ return audio_streams_.size() > 1 || (audio_streams_.size() >= video_streams_.size() && audio_ready2());\r
+ }\r
+\r
+ bool video_ready2() const\r
+ { \r
+ switch(display_mode_)\r
+ {\r
+ case display_mode::deinterlace_bob_reinterlace: \r
+ case display_mode::interlace: \r
+ return video_streams_.front().size() >= 2;\r
+ default: \r
+ return !video_streams_.front().empty();\r
+ }\r
+ }\r
+ \r
+ bool audio_ready2() const\r
+ {\r
+ switch(display_mode_)\r
+ {\r
+ case display_mode::duplicate: \r
+ return audio_streams_.front().size()/2 >= format_desc_.audio_samples_per_frame;\r
+ default: \r
+ return audio_streams_.front().size() >= format_desc_.audio_samples_per_frame;\r
+ }\r
+ }\r
+ \r
+ void commit()\r
+ {\r
+ if(video_streams_.size() > 1 && audio_streams_.size() > 1 && (!video_ready2() || !audio_ready2()))\r
+ {\r
+ if(!video_streams_.front().empty() || !audio_streams_.front().empty())\r
+ CASPAR_LOG(debug) << "Truncating: " << video_streams_.front().size() << L" video-frames, " << audio_streams_.front().size() << L" audio-samples.";\r
+\r
+ video_streams_.pop_front();\r
+ audio_streams_.pop_front();\r
+ }\r
+\r
+ if(!video_ready2() || !audio_ready2())\r
+ return;\r
+ \r
+ switch(display_mode_)\r
+ {\r
+ case display_mode::simple: return simple(frame_buffer_);\r
+ case display_mode::duplicate: return duplicate(frame_buffer_);\r
+ case display_mode::half: return half(frame_buffer_);\r
+ case display_mode::interlace: return interlace(frame_buffer_);\r
+ case display_mode::deinterlace_bob: return simple(frame_buffer_);\r
+ case display_mode::deinterlace_bob_reinterlace: return interlace(frame_buffer_);\r
+ case display_mode::deinterlace: return simple(frame_buffer_);\r
+ default: BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("invalid display-mode"));\r
+ }\r
+ }\r
+ \r
+ void simple(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame1 = pop_video();\r
+ frame1->audio_data() = pop_audio();\r
+\r
+ dest.push_back(frame1); \r
+ }\r
+\r
+ void duplicate(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame = pop_video();\r
+\r
+ auto frame1 = make_safe<core::write_frame>(*frame); // make a copy\r
+ frame1->audio_data() = pop_audio();\r
+\r
+ auto frame2 = frame;\r
+ frame2->audio_data() = pop_audio();\r
+\r
+ dest.push_back(frame1);\r
+ dest.push_back(frame2);\r
+ }\r
+\r
+ void half(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame1 = pop_video();\r
+ frame1->audio_data() = pop_audio();\r
+ \r
+ video_streams_.front().pop(); // Throw away\r
+\r
+ dest.push_back(frame1);\r
+ }\r
+ \r
+ void interlace(std::deque<safe_ptr<basic_frame>>& dest)\r
+ { \r
+ auto frame1 = pop_video();\r
+ frame1->audio_data() = pop_audio();\r
+ \r
+ auto frame2 = pop_video();\r
+\r
+ dest.push_back(core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode)); \r
+ }\r
+ \r
+ int64_t calc_nb_frames(int64_t nb_frames) const\r
+ {\r
+ switch(display_mode_)\r
+ {\r
+ case display_mode::interlace:\r
+ case display_mode::half:\r
+ return nb_frames/2;\r
+ case display_mode::duplicate:\r
+ case display_mode::deinterlace_bob:\r
+ return nb_frames*2;\r
+ default:\r
+ return nb_frames;\r
+ }\r
+ }\r
+};\r
+\r
+frame_muxer2::frame_muxer2(token_t& active_token,\r
+ video_source_t& video_source, \r
+ audio_source_t& audio_source,\r
+ target_t& target,\r
+ double in_fps, \r
+ const safe_ptr<core::frame_factory>& frame_factory)\r
+ : impl_(new implementation(active_token, video_source, audio_source, target, in_fps, frame_factory))\r
+{\r
+}\r
+int64_t frame_muxer2::calc_nb_frames(int64_t nb_frames) const\r
+{\r
+ return impl_->calc_nb_frames(nb_frames);\r
+}\r
+\r
}}
\ No newline at end of file
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
#include <vector>\r
\r
struct AVFrame;\r
safe_ptr<implementation> impl_;\r
};\r
\r
+class frame_muxer2 : boost::noncopyable\r
+{\r
+public:\r
+ \r
+ typedef Concurrency::ISource<bool> token_t;\r
+ typedef Concurrency::ISource<std::shared_ptr<AVFrame>> video_source_t;\r
+ typedef Concurrency::ISource<std::shared_ptr<core::audio_buffer>> audio_source_t;\r
+ typedef Concurrency::ITarget<std::shared_ptr<core::basic_frame>> target_t;\r
+\r
+ frame_muxer2(token_t& active_token,\r
+ video_source_t& video_source,\r
+ audio_source_t& audio_source, \r
+ target_t& target,\r
+ double in_fps, \r
+ const safe_ptr<core::frame_factory>& frame_factory);\r
+ \r
+ int64_t calc_nb_frames(int64_t nb_frames) const;\r
+private:\r
+ struct implementation;\r
+ safe_ptr<implementation> impl_;\r
+};\r
+\r
}}
\ No newline at end of file
#include "../stdafx.h"\r
\r
#include "input.h"\r
+#include "util.h"\r
#include "../ffmpeg_error.h"\r
#include "../tbb_avcodec.h"\r
\r
#include <core/video_format.h>\r
\r
+#include <common/concrt/scoped_oversubscription_token.h>\r
#include <common/diagnostics/graph.h>\r
#include <common/exception/exceptions.h>\r
#include <common/exception/win32_exception.h>\r
\r
-#include <tbb/concurrent_queue.h>\r
#include <tbb/atomic.h>\r
\r
#include <boost/range/algorithm.hpp>\r
-#include <boost/thread/condition_variable.hpp>\r
-#include <boost/thread/mutex.hpp>\r
-#include <boost/thread/thread.hpp>\r
+\r
+#include <agents.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace ffmpeg {\r
\r
static const size_t MAX_BUFFER_COUNT = 100;\r
static const size_t MIN_BUFFER_COUNT = 4;\r
static const size_t MAX_BUFFER_SIZE = 16 * 1000000;\r
\r
-struct input::implementation : boost::noncopyable\r
+struct input::implementation : public Concurrency::agent, boost::noncopyable\r
{ \r
std::shared_ptr<AVFormatContext> format_context_; // Destroy this last\r
int default_stream_index_;\r
const size_t length_;\r
size_t frame_number_;\r
\r
- tbb::concurrent_bounded_queue<std::shared_ptr<AVPacket>> buffer_;\r
- tbb::atomic<size_t> buffer_size_;\r
- boost::condition_variable buffer_cond_;\r
- boost::mutex buffer_mutex_;\r
+ input::token_t& active_token_;\r
+ input::target_t& video_target_;\r
+ input::target_t& audio_target_;\r
\r
- boost::thread thread_;\r
- tbb::atomic<bool> is_running_;\r
-\r
tbb::atomic<size_t> nb_frames_;\r
tbb::atomic<size_t> nb_loops_;\r
\r
+ int video_index_;\r
+ int audio_index_;\r
+\r
public:\r
- explicit implementation(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
- : graph_(graph)\r
+ explicit implementation(input::token_t& active_token,\r
+ input::target_t& video_target,\r
+ input::target_t& audio_target,\r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, \r
+ bool loop, \r
+ size_t start,\r
+ size_t length)\r
+ : active_token_(active_token)\r
+ , video_target_(video_target)\r
+ , audio_target_(audio_target)\r
+ , graph_(graph)\r
, loop_(loop)\r
, filename_(filename)\r
, start_(start)\r
, length_(length)\r
, frame_number_(0)\r
{ \r
- is_running_ = true;\r
nb_frames_ = 0;\r
nb_loops_ = 0;\r
\r
THROW_ON_ERROR2(avformat_find_stream_info(format_context_.get(), nullptr), print());\r
\r
default_stream_index_ = THROW_ON_ERROR2(av_find_default_stream_index(format_context_.get()), print());\r
-\r
+ video_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0);\r
+ audio_index_ = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_AUDIO, -1, -1, 0, 0);\r
+ \r
if(start_ > 0) \r
seek_frame(start_);\r
\r
- for(int n = 0; n < 16 && !full(); ++n)\r
+ for(int n = 0; n < 16; ++n)\r
read_next_packet();\r
\r
graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); \r
- graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f));\r
- graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
\r
- thread_ = boost::thread([this]{run();});\r
+ agent::start();\r
}\r
\r
~implementation()\r
{\r
- is_running_ = false;\r
- buffer_cond_.notify_all();\r
- thread_.join();\r
- }\r
- \r
- bool try_pop(std::shared_ptr<AVPacket>& packet)\r
- {\r
- const bool result = buffer_.try_pop(packet);\r
-\r
- if(result)\r
- {\r
- if(packet)\r
- buffer_size_ -= packet->size;\r
- buffer_cond_.notify_all();\r
- }\r
-\r
- graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
- graph_->update_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
-\r
- return result;\r
- }\r
-\r
- size_t nb_frames() const\r
- {\r
- return nb_frames_;\r
- }\r
-\r
- size_t nb_loops() const\r
- {\r
- return nb_loops_;\r
+ agent::wait(this);\r
}\r
-\r
-private:\r
\r
- void run()\r
- { \r
- caspar::win32_exception::install_handler();\r
-\r
+ virtual void run()\r
+ {\r
try\r
{\r
- CASPAR_LOG(info) << print() << " Thread Started.";\r
-\r
- while(is_running_)\r
+ while(Concurrency::receive(active_token_))\r
{\r
+ if(!read_next_packet())\r
{\r
- boost::unique_lock<boost::mutex> lock(buffer_mutex_);\r
- while(full())\r
- buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20));\r
+ Concurrency::send(video_target_, eof_packet());\r
+ Concurrency::send(audio_target_, eof_packet());\r
+ break;\r
}\r
- read_next_packet(); \r
- }\r
-\r
- CASPAR_LOG(info) << print() << " Thread Stopped.";\r
+ } \r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- is_running_ = false;\r
- }\r
+ } \r
+ \r
+ done();\r
}\r
- \r
- void read_next_packet()\r
+\r
+ bool read_next_packet()\r
{ \r
int ret = 0;\r
\r
- std::shared_ptr<AVPacket> read_packet(new AVPacket, [](AVPacket* p)\r
+ auto read_packet = create_packet();\r
+\r
{\r
- av_free_packet(p);\r
- delete p;\r
- });\r
- av_init_packet(read_packet.get());\r
+ Concurrency::scoped_oversubcription_token oversubscribe;\r
+ ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
+ }\r
\r
- ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
- \r
if(is_eof(ret)) \r
{\r
++nb_loops_;\r
} \r
else\r
{\r
- is_running_ = false;\r
CASPAR_LOG(trace) << print() << " Stopping.";\r
+ return false;\r
}\r
}\r
- else\r
+ else if(read_packet->stream_index == video_index_ || read_packet->stream_index == audio_index_)\r
{ \r
THROW_ON_ERROR(ret, print(), "av_read_frame");\r
\r
read_packet->size = size;\r
read_packet->data = data;\r
});\r
+ \r
+ if(read_packet->stream_index == video_index_)\r
+ Concurrency::send(video_target_, read_packet);\r
+ else if(read_packet->stream_index == audio_index_)\r
+ Concurrency::send(audio_target_, read_packet);\r
+ } \r
\r
- buffer_.try_push(read_packet);\r
- buffer_size_ += read_packet->size;\r
- \r
- graph_->update_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
- graph_->update_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
- } \r
- }\r
-\r
- bool full() const\r
- {\r
- return is_running_ && (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT;\r
+ return true;\r
}\r
\r
void seek_frame(int64_t frame, int flags = 0)\r
{ \r
- THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
- buffer_.push(nullptr);\r
+ THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
+ auto packet = create_packet();\r
+ packet->size = 0;\r
+ Concurrency::send(video_target_, loop_packet()); \r
+ Concurrency::send(audio_target_, loop_packet());\r
} \r
\r
bool is_eof(int ret)\r
}\r
};\r
\r
-input::input(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start, size_t length) \r
- : impl_(new implementation(graph, filename, loop, start, length)){}\r
-bool input::eof() const {return !impl_->is_running_;}\r
-bool input::try_pop(std::shared_ptr<AVPacket>& packet){return impl_->try_pop(packet);}\r
-safe_ptr<AVFormatContext> input::context(){return make_safe(impl_->format_context_);}\r
-size_t input::nb_frames() const {return impl_->nb_frames();}\r
-size_t input::nb_loops() const {return impl_->nb_loops();}\r
+input::input(token_t& active_token, \r
+ target_t& video_target, \r
+ target_t& audio_target, \r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, \r
+ bool loop, \r
+ size_t start, \r
+ size_t length)\r
+ : impl_(new implementation(active_token, video_target, audio_target, graph, filename, loop, start, length))\r
+{\r
+}\r
+\r
+safe_ptr<AVFormatContext> input::context()\r
+{\r
+ return safe_ptr<AVFormatContext>(impl_->format_context_);\r
+}\r
+\r
+size_t input::nb_frames() const\r
+{\r
+ return impl_->nb_frames_;\r
+}\r
+\r
+size_t input::nb_loops() const \r
+{\r
+ return impl_->nb_loops_;\r
+}\r
+\r
}}
\ No newline at end of file
\r
#include <common/memory/safe_ptr.h>\r
\r
+#include <agents.h>\r
#include <memory>\r
#include <string>\r
\r
#include <boost/noncopyable.hpp>\r
+#include <boost/range/iterator_range.hpp>\r
\r
struct AVFormatContext;\r
struct AVPacket;\r
}\r
\r
namespace ffmpeg {\r
-\r
+ \r
class input : boost::noncopyable\r
{\r
public:\r
- explicit input(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, size_t start = 0, size_t length = std::numeric_limits<size_t>::max());\r
-\r
- bool try_pop(std::shared_ptr<AVPacket>& packet);\r
- bool eof() const;\r
+ \r
+ typedef Concurrency::ISource<bool> token_t;\r
+ typedef Concurrency::ITarget<std::shared_ptr<AVPacket>> target_t;\r
\r
+ explicit input(token_t& active_token,\r
+ target_t& video_target, \r
+ target_t& audio_target, \r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, bool loop, \r
+ size_t start = 0, \r
+ size_t length = std::numeric_limits<size_t>::max());\r
+ \r
size_t nb_frames() const;\r
size_t nb_loops() const;\r
-\r
+ \r
safe_ptr<AVFormatContext> context();\r
private:\r
+ friend struct implemenation;\r
struct implementation;\r
std::shared_ptr<implementation> impl_;\r
};\r
#include "format/flv.h"\r
\r
#include <tbb/concurrent_unordered_map.h>\r
-#include <tbb/concurrent_queue.h>\r
+#include <concurrent_queue.h>\r
\r
#include <core/producer/frame/frame_transform.h>\r
#include <core/producer/frame/frame_factory.h>\r
\r
#include <common/exception/exceptions.h>\r
\r
-#include <tbb/parallel_for.h>\r
+#include <ppl.h>\r
\r
#include <boost/filesystem.hpp>\r
#include <boost/lexical_cast.hpp>\r
\r
safe_ptr<core::write_frame> make_write_frame(const void* tag, const safe_ptr<AVFrame>& decoded_frame, const safe_ptr<core::frame_factory>& frame_factory, int hints)\r
{ \r
- static tbb::concurrent_unordered_map<size_t, tbb::concurrent_queue<std::shared_ptr<SwsContext>>> sws_contexts_;\r
+ static tbb::concurrent_unordered_map<size_t, Concurrency::concurrent_queue<std::shared_ptr<SwsContext>>> sws_contexts_;\r
\r
const auto width = decoded_frame->width;\r
const auto height = decoded_frame->height;\r
auto decoded_linesize = decoded_frame->linesize[n];\r
\r
// Copy line by line since ffmpeg sometimes pads each line.\r
- tbb::affinity_partitioner ap;\r
- tbb::parallel_for(tbb::blocked_range<size_t>(0, static_cast<int>(desc.planes[n].height)), [&](const tbb::blocked_range<size_t>& r)\r
+ Concurrency::parallel_for(0, static_cast<int>(desc.planes[n].height), [&](size_t y)\r
{\r
- for(size_t y = r.begin(); y != r.end(); ++y)\r
- memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize);\r
- }, ap);\r
+ memcpy(result + y*plane.linesize, decoded + y*decoded_linesize, plane.linesize);\r
+ });\r
\r
write->commit(n);\r
}\r
video_context.time_base.den = static_cast<int>(closest_fps*1000000.0);\r
}\r
\r
+std::shared_ptr<AVPacket> create_packet()\r
+{\r
+ std::shared_ptr<AVPacket> packet(new AVPacket, [](AVPacket* p)\r
+ {\r
+ av_free_packet(p);\r
+ delete p;\r
+ });\r
+ \r
+ av_init_packet(packet.get());\r
+ return packet;\r
+}\r
+\r
+const std::shared_ptr<AVPacket>& loop_packet()\r
+{\r
+ static auto packet1 = create_packet();\r
+ return packet1;\r
+}\r
+\r
+const std::shared_ptr<AVPacket>& eof_packet()\r
+{\r
+ static auto packet2 = create_packet();\r
+ return packet2;\r
+}\r
+\r
+const std::shared_ptr<AVFrame>& loop_video()\r
+{\r
+ static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+ return frame1;\r
+}\r
+\r
+const std::shared_ptr<AVFrame>& empty_video()\r
+{\r
+ static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+ return frame1;\r
+}\r
+\r
+const std::shared_ptr<AVFrame>& eof_video()\r
+{\r
+ static auto frame2 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+ return frame2;\r
+}\r
+\r
+const std::shared_ptr<core::audio_buffer>& loop_audio()\r
+{\r
+ static auto audio1 = std::make_shared<core::audio_buffer>();\r
+ return audio1;\r
+}\r
+\r
+const std::shared_ptr<core::audio_buffer>& empty_audio()\r
+{\r
+ static auto audio1 = std::make_shared<core::audio_buffer>();\r
+ return audio1;\r
+}\r
+\r
+const std::shared_ptr<core::audio_buffer>& eof_audio()\r
+{\r
+ static auto audio2 = std::make_shared<core::audio_buffer>();\r
+ return audio2;\r
+}\r
+\r
}}
\ No newline at end of file
\r
#include <core/video_format.h>\r
#include <core/producer/frame/pixel_format.h>\r
+#include <core/mixer/audio/audio_mixer.h>\r
\r
-extern "C"\r
+\r
+#if defined(_MSC_VER)\r
+#pragma warning (push)\r
+#pragma warning (disable : 4244)\r
+#endif\r
+extern "C" \r
{\r
#include <libavutil/pixfmt.h>\r
+ #include <libavcodec/avcodec.h>\r
}\r
+#if defined(_MSC_VER)\r
+#pragma warning (pop)\r
+#endif\r
\r
struct AVFrame;\r
struct AVFormatContext;\r
+struct AVPacket;\r
\r
namespace caspar {\r
\r
\r
void fix_meta_data(AVFormatContext& context);\r
\r
+std::shared_ptr<AVPacket> create_packet();\r
+const std::shared_ptr<AVPacket>& loop_packet();\r
+const std::shared_ptr<AVPacket>& eof_packet();\r
+const std::shared_ptr<AVFrame>& loop_video();\r
+const std::shared_ptr<AVFrame>& empty_video();\r
+const std::shared_ptr<AVFrame>& eof_video();\r
+const std::shared_ptr<core::audio_buffer>& loop_audio();\r
+const std::shared_ptr<core::audio_buffer>& empty_audio();\r
+const std::shared_ptr<core::audio_buffer>& eof_audio();\r
+\r
}}
\ No newline at end of file
#include "../../ffmpeg_error.h"\r
#include "../../tbb_avcodec.h"\r
\r
-#include <core/producer/frame/frame_transform.h>\r
-#include <core/producer/frame/frame_factory.h>\r
-\r
-#include <boost/range/algorithm_ext/push_back.hpp>\r
-#include <boost/filesystem.hpp>\r
-\r
-#include <queue>\r
+#include <core/producer/frame/basic_frame.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-struct video_decoder::implementation : boost::noncopyable\r
+struct video_decoder::implementation : public Concurrency::agent, boost::noncopyable\r
{\r
- const safe_ptr<core::frame_factory> frame_factory_;\r
+ video_decoder::token_t& active_token_;\r
+ video_decoder::source_t& source_;\r
+ video_decoder::target_t& target_;\r
+\r
std::shared_ptr<AVCodecContext> codec_context_;\r
int index_;\r
\r
- std::queue<std::shared_ptr<AVPacket>> packets_;\r
-\r
filter filter_;\r
\r
double fps_;\r
\r
size_t width_;\r
size_t height_;\r
+ bool is_progressive_;\r
\r
public:\r
- explicit implementation(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter) \r
- : frame_factory_(frame_factory)\r
- , filter_(filter)\r
- , fps_(frame_factory_->get_video_format_desc().fps)\r
+ explicit implementation(video_decoder::token_t& active_token,\r
+ video_decoder::source_t& source,\r
+ video_decoder::target_t& target,\r
+ const safe_ptr<AVFormatContext>& context,\r
+ double fps,\r
+ const std::wstring& filter) \r
+ : active_token_(active_token)\r
+ , source_(source)\r
+ , target_(target)\r
+ , filter_(filter.empty() ? L"copy" : filter)\r
+ , fps_(fps)\r
, nb_frames_(0)\r
, width_(0)\r
, height_(0)\r
+ , is_progressive_(true)\r
{\r
try\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
CASPAR_LOG(warning) << "[video_decoder] Failed to open video-stream. Running without video."; \r
}\r
- }\r
\r
- void push(const std::shared_ptr<AVPacket>& packet)\r
+ start();\r
+ }\r
+ \r
+ ~implementation()\r
{\r
- if(packet && packet->stream_index != index_)\r
- return;\r
-\r
- packets_.push(packet);\r
+ agent::wait(this);\r
}\r
-\r
- std::vector<std::shared_ptr<AVFrame>> poll()\r
- { \r
- std::vector<std::shared_ptr<AVFrame>> result;\r
-\r
- if(packets_.empty())\r
- return result;\r
-\r
- if(!codec_context_)\r
- return empty_poll();\r
-\r
- auto packet = packets_.front();\r
- \r
- if(packet)\r
- { \r
- BOOST_FOREACH(auto& frame, decode(*packet))\r
- boost::range::push_back(result, filter_.execute(frame));\r
-\r
- if(packet->size == 0)\r
- packets_.pop();\r
- }\r
- else\r
+ \r
+ virtual void run()\r
+ {\r
+ try\r
{\r
- if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+ while(Concurrency::receive(active_token_))\r
{\r
- AVPacket pkt;\r
- av_init_packet(&pkt);\r
- pkt.data = nullptr;\r
- pkt.size = 0;\r
-\r
- BOOST_FOREACH(auto& frame, decode(pkt))\r
- boost::range::push_back(result, filter_.execute(frame)); \r
- }\r
-\r
- if(result.empty())\r
- { \r
- packets_.pop();\r
- avcodec_flush_buffers(codec_context_.get());\r
- result.push_back(nullptr);\r
+ auto packet = Concurrency::receive(source_);\r
+ if(packet == eof_packet())\r
+ {\r
+ Concurrency::send(target_, eof_video());\r
+ break;\r
+ }\r
+\r
+ if(packet == loop_packet())\r
+ { \r
+ if(codec_context_)\r
+ {\r
+ if(codec_context_->codec->capabilities & CODEC_CAP_DELAY)\r
+ {\r
+ AVPacket pkt;\r
+ av_init_packet(&pkt);\r
+ pkt.data = nullptr;\r
+ pkt.size = 0;\r
+ \r
+ BOOST_FOREACH(auto& frame1, decode(pkt))\r
+ {\r
+ BOOST_FOREACH(auto& frame2, filter_.execute(frame1))\r
+ Concurrency::send(target_, std::shared_ptr<AVFrame>(frame2));\r
+ } \r
+ }\r
+\r
+ avcodec_flush_buffers(codec_context_.get());\r
+ }\r
+\r
+ Concurrency::send(target_, loop_video()); \r
+ }\r
+ else if(!codec_context_)\r
+ {\r
+ Concurrency::send(target_, empty_video()); \r
+ }\r
+ else\r
+ {\r
+ while(packet->size > 0)\r
+ {\r
+ BOOST_FOREACH(auto& frame1, decode(*packet))\r
+ {\r
+ BOOST_FOREACH(auto& frame2, filter_.execute(frame1))\r
+ Concurrency::send(target_, std::shared_ptr<AVFrame>(frame2));\r
+ }\r
+ }\r
+ }\r
}\r
}\r
- \r
- return result;\r
- }\r
-\r
- std::vector<std::shared_ptr<AVFrame>> empty_poll()\r
- { \r
- auto packet = packets_.front();\r
- packets_.pop();\r
-\r
- if(!packet) \r
- return boost::assign::list_of(nullptr);\r
-\r
- std::shared_ptr<AVFrame> frame(avcodec_alloc_frame(), av_free);\r
- frame->data[0] = nullptr;\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
\r
- return boost::assign::list_of(frame); \r
+ std::shared_ptr<AVPacket> packet;\r
+ Concurrency::try_receive(source_, packet); \r
+ \r
+ done();\r
}\r
\r
std::vector<std::shared_ptr<AVFrame>> decode(AVPacket& pkt)\r
if(decoded_frame->repeat_pict % 2 > 0)\r
CASPAR_LOG(warning) << "[video_decoder]: Field repeat_pict not implemented.";\r
\r
+ is_progressive_ = decoded_frame->interlaced_frame == 0;\r
+\r
return std::vector<std::shared_ptr<AVFrame>>(1 + decoded_frame->repeat_pict/2, decoded_frame);\r
}\r
- \r
- bool ready() const\r
- {\r
- return !packets_.empty();\r
- }\r
- \r
+ \r
double fps() const\r
{\r
return fps_;\r
}\r
};\r
\r
-video_decoder::video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter) : impl_(new implementation(context, frame_factory, filter)){}\r
-void video_decoder::push(const std::shared_ptr<AVPacket>& packet){impl_->push(packet);}\r
-std::vector<std::shared_ptr<AVFrame>> video_decoder::poll(){return impl_->poll();}\r
-bool video_decoder::ready() const{return impl_->ready();}\r
+video_decoder::video_decoder(token_t& active_token,\r
+ source_t& source,\r
+ target_t& target,\r
+ const safe_ptr<AVFormatContext>& context, \r
+ double fps, \r
+ const std::wstring& filter) \r
+ : impl_(new implementation(active_token, source, target, context, fps, filter))\r
+{\r
+}\r
+\r
double video_decoder::fps() const{return impl_->fps();}\r
int64_t video_decoder::nb_frames() const{return impl_->nb_frames_;}\r
size_t video_decoder::width() const{return impl_->width_;}\r
size_t video_decoder::height() const{return impl_->height_;}\r
+bool video_decoder::is_progressive() const{return impl_->is_progressive_;}\r
\r
}}
\ No newline at end of file
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
#include <vector>\r
\r
struct AVFormatContext;\r
\r
namespace core {\r
struct frame_factory;\r
- class write_frame;\r
}\r
\r
namespace ffmpeg {\r
class video_decoder : boost::noncopyable\r
{\r
public:\r
- explicit video_decoder(const safe_ptr<AVFormatContext>& context, const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filter);\r
- \r
- void push(const std::shared_ptr<AVPacket>& packet);\r
- bool ready() const;\r
- std::vector<std::shared_ptr<AVFrame>> poll();\r
- \r
+\r
+ typedef Concurrency::ISource<bool> token_t;\r
+ typedef Concurrency::ISource<std::shared_ptr<AVPacket>> source_t;\r
+ typedef Concurrency::ITarget<std::shared_ptr<AVFrame>> target_t;\r
+\r
+ explicit video_decoder(token_t& active_token,\r
+ source_t& source,\r
+ target_t& target,\r
+ const safe_ptr<AVFormatContext>& context, \r
+ double fps, \r
+ const std::wstring& filter); \r
+\r
size_t width() const;\r
size_t height() const;\r
\r
int64_t nb_frames() const;\r
+ bool is_progressive() const;\r
\r
double fps() const;\r
private:\r
#include <common/env.h>\r
#include <common/utility/assert.h>\r
\r
-#include <tbb/task.h>\r
+#include <ppl.h>\r
+\r
#include <tbb/atomic.h>\r
#include <tbb/parallel_for.h>\r
#include <tbb/tbb_thread.h>\r
\r
int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)\r
{\r
- tbb::parallel_for(tbb::blocked_range<size_t>(0, count), [&](const tbb::blocked_range<size_t>& r)\r
+ Concurrency::parallel_for(0, count, [&](size_t n)\r
{\r
- for(size_t n = r.begin(); n != r.end(); ++n) \r
- {\r
- int r = func(s, reinterpret_cast<uint8_t*>(arg) + n*size);\r
- if(ret)\r
- ret[n] = r;\r
- }\r
+ int r = func(s, reinterpret_cast<uint8_t*>(arg) + n*size);\r
+ if(ret)\r
+ ret[n] = r;\r
});\r
\r
return 0;\r
\r
CASPAR_ASSERT(tbb::tbb_thread::hardware_concurrency() < 16);\r
// Note: this will probably only work when tbb::task_scheduler_init::num_threads() < 16.\r
- tbb::parallel_for(tbb::blocked_range<int>(0, count, 2), [&](const tbb::blocked_range<int> &r) \r
+ Concurrency::parallel_for(0, count, 2, [&](int jobnr) \r
{ \r
int threadnr = counter++; \r
- for(int jobnr = r.begin(); jobnr != r.end(); ++jobnr)\r
- { \r
- int r = func(s, arg, jobnr, threadnr); \r
- if (ret) \r
- ret[jobnr] = r; \r
- }\r
+ int r = func(s, arg, jobnr, threadnr); \r
+ if (ret) \r
+ ret[jobnr] = r; \r
--counter;\r
}); \r
\r
<graphs>true</graphs>\r
</diagnostics>\r
<consumers>\r
- <buffer-depth>3</buffer-depth>\r
+ <buffer-depth>5</buffer-depth>\r
</consumers>\r
<mixers>\r
<blend-modes>false</blend-modes>\r
</producers>\r
<channels>\r
<channel>\r
- <video-mode>1080p5000</video-mode>\r
+ <video-mode>PAL</video-mode>\r
<consumers>\r
<decklink>\r
<device>1</device>\r
<embedded-audio>true</embedded-audio>\r
+ <low-latency>true</low-latency>\r
</decklink>\r
- <screen></screen>\r
+ <audio></audio>\r
</consumers>\r
</channel>\r
</channels>\r