+//--------------------------------------------------------------------------\r
+// \r
+// Copyright (c) Microsoft Corporation. All rights reserved. \r
+// \r
+// File: agents_extras.h\r
+//\r
+// Implementation of various useful message blocks\r
+//\r
+//--------------------------------------------------------------------------\r
+\r
+#pragma once\r
+\r
+#include <agents.h>\r
+\r
+// bounded_buffer uses a map\r
+#include <map>\r
+#include <queue>\r
+\r
+namespace Concurrency\r
+{\r
+ /// <summary>\r
+ /// Simple queue class for storing messages.\r
+ /// </summary>\r
+ /// <typeparam name="_Type">\r
+ /// The payload type of messages stored in this queue.\r
+ /// </typeparam>\r
+ template <class _Type>\r
+ class MessageQueue\r
+ {\r
+ public:\r
+ typedef message<_Type> _Message;\r
+\r
+ /// <summary>\r
+ /// Constructs an initially empty queue.\r
+ /// </summary>\r
+ MessageQueue()\r
+ {\r
+ }\r
+\r
+ /// <summary>\r
+ /// Removes and deletes any messages remaining in the queue.\r
+ /// </summary>\r
+ ~MessageQueue() \r
+ {\r
+ _Message * _Msg = dequeue();\r
+ while (_Msg != NULL)\r
+ {\r
+ delete _Msg;\r
+ _Msg = dequeue();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Add an item to the queue.\r
+ /// </summary>\r
+ /// <param name="_Msg">\r
+ /// Message to add.\r
+ /// </param>\r
+ void enqueue(_Message *_Msg)\r
+ {\r
+ _M_queue.push(_Msg);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Dequeue an item from the head of queue.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// Returns a pointer to the message found at the head of the queue.\r
+ /// </returns>\r
+ _Message * dequeue()\r
+ {\r
+ _Message * _Msg = NULL;\r
+\r
+ if (!_M_queue.empty())\r
+ {\r
+ _Msg = _M_queue.front();\r
+ _M_queue.pop();\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Return the item at the head of the queue, without dequeuing\r
+ /// </summary>\r
+ /// <returns>\r
+ /// Returns a pointer to the message found at the head of the queue.\r
+ /// </returns>\r
+ _Message * peek() const\r
+ {\r
+ _Message * _Msg = NULL;\r
+\r
+ if (!_M_queue.empty())\r
+ {\r
+ _Msg = _M_queue.front();\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Returns the number of items currently in the queue.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// Size of the queue.\r
+ /// </returns>\r
+ size_t count() const\r
+ {\r
+ return _M_queue.size();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Checks to see if specified msg id is at the head of the queue.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// Message id to check for.\r
+ /// </param>\r
+ /// <returns>\r
+ /// True if a message with specified id is at the head, false otherwise.\r
+ /// </returns>\r
+ bool is_head(const runtime_object_identity _MsgId) const\r
+ {\r
+ _Message * _Msg = peek();\r
+ if(_Msg != NULL)\r
+ {\r
+ return _Msg->msg_id() == _MsgId;\r
+ }\r
+ return false;\r
+ }\r
+\r
+ private:\r
+ \r
+ std::queue<_Message *> _M_queue;\r
+ };\r
+\r
+ /// <summary>\r
+ /// Simple queue implementation that takes into account priority\r
+ /// using the comparison operator <.\r
+ /// </summary>\r
+ /// <typeparam name="_Type">\r
+ /// The payload type of messages stored in this queue.\r
+ /// </typeparam>\r
+ template <class _Type>\r
+ class PriorityQueue\r
+ {\r
+ public:\r
+ /// <summary>\r
+ /// Constructs an initially empty queue.\r
+ /// </summary>\r
+ PriorityQueue() : _M_pHead(NULL), _M_count(0) {}\r
+\r
+ /// <summary>\r
+ /// Removes and deletes any messages remaining in the queue.\r
+ /// </summary>\r
+ ~PriorityQueue() \r
+ {\r
+ message<_Type> * _Msg = dequeue();\r
+ while (_Msg != NULL)\r
+ {\r
+ delete _Msg;\r
+ _Msg = dequeue();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Add an item to the queue, comparisons using the 'payload' field\r
+ /// will determine the location in the queue.\r
+ /// </summary>\r
+ /// <param name="_Msg">\r
+ /// Message to add.\r
+ /// </param>\r
+ /// <param name="fCanReplaceHead">\r
+ /// True if this new message can be inserted at the head.\r
+ /// </param>\r
+ void enqueue(message<_Type> *_Msg, const bool fInsertAtHead = true)\r
+ {\r
+ MessageNode *_Element = new MessageNode();\r
+ _Element->_M_pMsg = _Msg;\r
+\r
+ // Find location to insert.\r
+ MessageNode *pCurrent = _M_pHead;\r
+ MessageNode *pPrev = NULL;\r
+ if(!fInsertAtHead && pCurrent != NULL)\r
+ {\r
+ pPrev = pCurrent;\r
+ pCurrent = pCurrent->_M_pNext;\r
+ }\r
+ while(pCurrent != NULL)\r
+ {\r
+ if(_Element->_M_pMsg->payload < pCurrent->_M_pMsg->payload)\r
+ {\r
+ break;\r
+ }\r
+ pPrev = pCurrent;\r
+ pCurrent = pCurrent->_M_pNext;\r
+ }\r
+\r
+ // Insert at head.\r
+ if(pPrev == NULL)\r
+ {\r
+ _M_pHead = _Element;\r
+ }\r
+ else\r
+ {\r
+ pPrev->_M_pNext = _Element;\r
+ }\r
+\r
+ // Last item in queue.\r
+ if(pCurrent == NULL)\r
+ {\r
+ _Element->_M_pNext = NULL;\r
+ }\r
+ else\r
+ {\r
+ _Element->_M_pNext = pCurrent;\r
+ }\r
+\r
+ ++_M_count;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Dequeue an item from the head of queue.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// Returns a pointer to the message found at the head of the queue.\r
+ /// </returns>\r
+ message<_Type> * dequeue()\r
+ {\r
+ if (_M_pHead == NULL) \r
+ {\r
+ return NULL;\r
+ }\r
+\r
+ MessageNode *_OldHead = _M_pHead;\r
+ message<_Type> * _Result = _OldHead->_M_pMsg;\r
+\r
+ _M_pHead = _OldHead->_M_pNext;\r
+\r
+ delete _OldHead;\r
+\r
+ if(--_M_count == 0)\r
+ {\r
+ _M_pHead = NULL;\r
+ }\r
+ return _Result;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Return the item at the head of the queue, without dequeuing\r
+ /// </summary>\r
+ /// <returns>\r
+ /// Returns a pointer to the message found at the head of the queue.\r
+ /// </returns>\r
+ message<_Type> * peek() const\r
+ {\r
+ if(_M_count != 0)\r
+ {\r
+ return _M_pHead->_M_pMsg;\r
+ }\r
+ return NULL;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Returns the number of items currently in the queue.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// Size of the queue.\r
+ /// </returns>\r
+ size_t count() const\r
+ {\r
+ return _M_count;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Checks to see if specified msg id is at the head of the queue.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// Message id to check for.\r
+ /// </param>\r
+ /// <returns>\r
+ /// True if a message with specified id is at the head, false otherwise.\r
+ /// </returns>\r
+ bool is_head(const runtime_object_identity _MsgId) const\r
+ {\r
+ if(_M_count != 0)\r
+ {\r
+ return _M_pHead->_M_pMsg->msg_id() == _MsgId;\r
+ }\r
+ return false;\r
+ }\r
+\r
+ private:\r
+ \r
+ // Used to store individual message nodes.\r
+ struct MessageNode\r
+ {\r
+ MessageNode() : _M_pMsg(NULL), _M_pNext(NULL) {}\r
+ message<_Type> * _M_pMsg;\r
+ MessageNode * _M_pNext;\r
+ };\r
+\r
+ // A pointer to the head of the queue.\r
+ MessageNode * _M_pHead;\r
+\r
+ // The number of elements presently stored in the queue.\r
+ size_t _M_count;\r
+ };\r
+ \r
+ /// <summary>\r
+ /// priority_buffer is a buffer that uses a comparison operator on the 'payload' of each message to determine\r
+ /// order when offering to targets. Besides this it acts exactly like an unbounded_buffer.\r
+ /// </summary>\r
+ /// <typeparam name="_Type">\r
+ /// The payload type of messages stored and propagated by the buffer.\r
+ /// </typeparam>\r
+ template<class _Type>\r
+ class priority_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
+ {\r
+ public:\r
+\r
+ /// <summary>\r
+ /// Creates an priority_buffer within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ priority_buffer() \r
+ {\r
+ initialize_source_and_target();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an priority_buffer within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ priority_buffer(filter_method const& _Filter)\r
+ {\r
+ initialize_source_and_target();\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an priority_buffer within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ priority_buffer(Scheduler& _PScheduler)\r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an priority_buffer within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ priority_buffer(Scheduler& _PScheduler, filter_method const& _Filter) \r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an priority_buffer within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ priority_buffer(ScheduleGroup& _PScheduleGroup)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an priority_buffer within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ priority_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Cleans up any resources that may have been created by the priority_buffer.\r
+ /// </summary>\r
+ ~priority_buffer()\r
+ {\r
+ // Remove all links\r
+ remove_network_links();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Add an item to the priority_buffer\r
+ /// </summary>\r
+ /// <param name="_Item">\r
+ /// A reference to the item to add.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A boolean indicating whether the data was accepted.\r
+ /// </returns>\r
+ bool enqueue(_Type const& _Item)\r
+ {\r
+ return Concurrency::send<_Type>(this, _Item);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Remove an item from the priority_buffer\r
+ /// </summary>\r
+ /// <returns>\r
+ /// The message payload.\r
+ /// </returns>\r
+ _Type dequeue()\r
+ {\r
+ return receive<_Type>(this);\r
+ }\r
+\r
+ protected:\r
+\r
+ /// <summary>\r
+ /// The main propagate() function for ITarget blocks. Called by a source\r
+ /// block, generally within an asynchronous task to send messages to its targets.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message.\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// It is important that calls to propagate do *not* take the same lock on the\r
+ /// internal structure that is used by Consume and the LWT. Doing so could\r
+ /// result in a deadlock with the Consume call. (in the case of the priority_buffer,\r
+ /// this lock is the m_internalLock)\r
+ /// </remarks>\r
+ virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ message_status _Result = accepted;\r
+ //\r
+ // Accept the message being propagated\r
+ // Note: depending on the source block propagating the message\r
+ // this may not necessarily be the same message (pMessage) first\r
+ // passed into the function.\r
+ //\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ async_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ _Result = missed;\r
+ }\r
+\r
+ return _Result;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Synchronously sends a message to this block. When this function completes the message will\r
+ /// already have propagated into the block.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message.\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ sync_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ return missed;\r
+ }\r
+\r
+ return accepted;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Accepts an offered message by the source, transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
+ {\r
+ //\r
+ // Peek at the head message in the message buffer. If the Ids match\r
+ // dequeue and transfer ownership\r
+ //\r
+ message<_Type> * _Msg = NULL;\r
+\r
+ if (_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ _Msg = _M_messageBuffer.dequeue();\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Reserves a message previously offered by the source.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A Boolean indicating whether the reservation worked or not.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// After 'reserve' is called, either 'consume' or 'release' must be called.\r
+ /// </remarks>\r
+ virtual bool reserve_message(runtime_object_identity _MsgId)\r
+ {\r
+ // Allow reservation if this is the head message\r
+ return _M_messageBuffer.is_head(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Consumes a message that was reserved previously.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// Similar to 'accept', but is always preceded by a call to 'reserve'.\r
+ /// </remarks>\r
+ virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
+ {\r
+ // By default, accept the message\r
+ return accept_message(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Releases a previous message reservation.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ virtual void release_message(runtime_object_identity _MsgId)\r
+ {\r
+ // The head message is the one reserved.\r
+ if (!_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ throw message_not_found();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Resumes propagation after a reservation has been released\r
+ /// </summary>\r
+ virtual void resume_propagation()\r
+ {\r
+ // If there are any messages in the buffer, propagate them out\r
+ if (_M_messageBuffer.count() > 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Notification that a target was linked to this source.\r
+ /// </summary>\r
+ /// <param name="_PTarget">\r
+ /// A pointer to the newly linked target.\r
+ /// </param>\r
+ virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
+ {\r
+ // If the message queue is blocked due to reservation\r
+ // there is no need to do any message propagation\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ message<_Type> * _Msg = _M_messageBuffer.peek();\r
+\r
+ if (_Msg != NULL)\r
+ {\r
+ // Propagate the head message to the new target\r
+ message_status _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ if (_Status == accepted)\r
+ {\r
+ // The target accepted the message, restart propagation.\r
+ propagate_to_any_targets(NULL);\r
+ }\r
+\r
+ // If the status is anything other than accepted, then leave\r
+ // the message queue blocked.\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Takes the message and propagates it to all the targets of this priority_buffer.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to a new message.\r
+ /// </param>\r
+ virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
+ {\r
+ // Enqueue pMessage to the internal unbounded buffer queue if it is non-NULL.\r
+ // _PMessage can be NULL if this LWT was the result of a Repropagate call\r
+ // out of a Consume or Release (where no new message is queued up, but\r
+ // everything remaining in the priority_buffer needs to be propagated out)\r
+ if (_PMessage != NULL)\r
+ {\r
+ message<_Type> *pPrevHead = _M_messageBuffer.peek();\r
+\r
+ // If a reservation is held make sure to not insert this new\r
+ // message before it.\r
+ if(_M_pReservedFor != NULL)\r
+ {\r
+ _M_messageBuffer.enqueue(_PMessage, false);\r
+ }\r
+ else\r
+ {\r
+ _M_messageBuffer.enqueue(_PMessage);\r
+ }\r
+\r
+ // If the head message didn't change, we can safely assume that\r
+ // the head message is blocked and waiting on Consume(), Release() or a new\r
+ // link_target()\r
+ if (pPrevHead != NULL && !_M_messageBuffer.is_head(pPrevHead->msg_id()))\r
+ {\r
+ return;\r
+ }\r
+ }\r
+\r
+ // Attempt to propagate messages to all the targets\r
+ _Propagate_priority_order();\r
+ }\r
+\r
+ private:\r
+\r
+ /// <summary>\r
+ /// Attempts to propagate out any messages currently in the block.\r
+ /// </summary>\r
+ void _Propagate_priority_order()\r
+ {\r
+ message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
+\r
+ // If someone has reserved the _Head message, don't propagate anymore\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ while (_Msg != NULL)\r
+ {\r
+ message_status _Status = declined;\r
+\r
+ // Always start from the first target that linked.\r
+ for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
+ {\r
+ ITarget<_Target_type> * _PTarget = *_Iter;\r
+ _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ // Ownership of message changed. Do not propagate this\r
+ // message to any other target.\r
+ if (_Status == accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // If the target just propagated to reserved this message, stop\r
+ // propagating it to others.\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ break;\r
+ }\r
+ }\r
+\r
+ // If status is anything other than accepted, then the head message\r
+ // was not propagated out. Thus, nothing after it in the queue can\r
+ // be propagated out. Cease propagation.\r
+ if (_Status != accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // Get the next message\r
+ _Msg = _M_messageBuffer.peek();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Priority Queue used to store messages.\r
+ /// </summary>\r
+ PriorityQueue<_Type> _M_messageBuffer;\r
+\r
+ //\r
+ // Hide assignment operator and copy constructor.\r
+ //\r
+ priority_buffer const &operator =(priority_buffer const&); // no assignment operator\r
+ priority_buffer(priority_buffer const &); // no copy constructor\r
+ };\r
+\r
+\r
+ /// <summary>\r
+ /// A bounded_buffer implementation. Once the capacity is reached it will save the offered message\r
+ /// id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume\r
+ /// any of the postponed messages. Preference is given to previously offered messages before new ones.\r
+ ///\r
+ /// NOTE: this bounded_buffer implementation contains code that is very unique to this particular block. \r
+ /// Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer\r
+ /// implementation uses a critical_section, several interlocked operations, and additional calls to async_send.\r
+ /// These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem.\r
+ /// </summary>\r
+ /// <typeparam name="_Type">\r
+ /// The payload type of messages stored and propagated by the buffer.\r
+ /// </typeparam>\r
+ template<class _Type>\r
+ class bounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
+ {\r
+ public:\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ bounded_buffer(const size_t capacity)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, filter_method const& _Filter)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target();\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, Scheduler& _PScheduler)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter) \r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
+ : _M_capacity(capacity), _M_currentSize(0)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Cleans up any resources that may have been used by the bounded_buffer.\r
+ /// </summary>\r
+ ~bounded_buffer()\r
+ {\r
+ // Remove all links\r
+ remove_network_links();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Add an item to the bounded_buffer.\r
+ /// </summary>\r
+ /// <param name="_Item">\r
+ /// A reference to the item to add.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A boolean indicating whether the data was accepted.\r
+ /// </returns>\r
+ bool enqueue(_Type const& _Item)\r
+ {\r
+ return Concurrency::send<_Type>(this, _Item);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Remove an item from the bounded_buffer.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// The message payload.\r
+ /// </returns>\r
+ _Type dequeue()\r
+ {\r
+ return receive<_Type>(this);\r
+ }\r
+\r
+ protected:\r
+\r
+ /// <summary>\r
+ /// The main propagate() function for ITarget blocks. Called by a source\r
+ /// block, generally within an asynchronous task to send messages to its targets.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message.\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// It is important that calls to propagate do *not* take the same lock on the\r
+ /// internal structure that is used by Consume and the LWT. Doing so could\r
+ /// result in a deadlock with the Consume call. (in the case of the bounded_buffer,\r
+ /// this lock is the m_internalLock)\r
+ /// </remarks>\r
+ virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ message_status _Result = accepted;\r
+ \r
+ // Check current capacity. \r
+ if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+ {\r
+ // Postpone the message, buffer is full.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = postponed;\r
+\r
+ // Save off the message id from this source to later try\r
+ // and reserve/consume when more space is free.\r
+ {\r
+ critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+ _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
+ }\r
+\r
+ async_send(NULL);\r
+ }\r
+ else\r
+ {\r
+ //\r
+ // Accept the message being propagated\r
+ // Note: depending on the source block propagating the message\r
+ // this may not necessarily be the same message (pMessage) first\r
+ // passed into the function.\r
+ //\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ async_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ // Didn't get a message so need to decrement.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = missed;\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ return _Result;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Synchronously sends a message to this block. When this function completes the message will\r
+ /// already have propagated into the block.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message.\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ message_status _Result = accepted;\r
+ \r
+ // Check current capacity. \r
+ if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+ {\r
+ // Postpone the message, buffer is full.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = postponed;\r
+\r
+ // Save off the message id from this source to later try\r
+ // and reserve/consume when more space is free.\r
+ {\r
+ critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+ _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
+ }\r
+\r
+ async_send(NULL);\r
+ }\r
+ else\r
+ {\r
+ //\r
+ // Accept the message being propagated\r
+ // Note: depending on the source block propagating the message\r
+ // this may not necessarily be the same message (pMessage) first\r
+ // passed into the function.\r
+ //\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ async_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ // Didn't get a message so need to decrement.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ _Result = missed;\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ return _Result;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Accepts an offered message by the source, transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
+ {\r
+ //\r
+ // Peek at the head message in the message buffer. If the Ids match\r
+ // dequeue and transfer ownership\r
+ //\r
+ message<_Type> * _Msg = NULL;\r
+\r
+ if (_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ _Msg = _M_messageBuffer.dequeue();\r
+\r
+ // Give preference to any previously postponed messages\r
+ // before decrementing current size.\r
+ if(!try_consume_msg())\r
+ {\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ }\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Try to reserve and consume a message from list of saved message ids.\r
+ /// </summary>\r
+ /// <returns>\r
+ /// True if a message was sucessfully consumed, false otherwise.\r
+ /// </returns>\r
+ bool try_consume_msg()\r
+ {\r
+ runtime_object_identity _ReservedId = -1;\r
+ ISource<_Type> * _PSource = NULL;\r
+\r
+ // Walk through source links seeing if any saved ids exist.\r
+ bool _ConsumedMsg = true;\r
+ while(_ConsumedMsg)\r
+ {\r
+ source_iterator _Iter = _M_connectedSources.begin();\r
+ {\r
+ critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
+ for (; *_Iter != NULL; ++_Iter)\r
+ {\r
+ _PSource = *_Iter;\r
+ std::map<ISource<_Type> *, runtime_object_identity>::iterator _MapIter;\r
+ if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end())\r
+ {\r
+ _ReservedId = _MapIter->second;\r
+ _M_savedSourceMsgIds.erase(_MapIter);\r
+ break;\r
+ }\r
+ }\r
+ }\r
+\r
+ // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster.\r
+ if(_ReservedId != -1)\r
+ {\r
+ if(_PSource->reserve(_ReservedId, this))\r
+ {\r
+ message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this);\r
+ async_send(_ConsumedMsg);\r
+ return true;\r
+ }\r
+ // Reserve failed go or link was removed, \r
+ // go back and try and find a different msg id.\r
+ else\r
+ {\r
+ continue;\r
+ }\r
+ }\r
+\r
+ // If this point is reached the map of source ids was empty.\r
+ break;\r
+ }\r
+\r
+ return false;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Reserves a message previously offered by the source.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A Boolean indicating whether the reservation worked or not.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// After 'reserve' is called, either 'consume' or 'release' must be called.\r
+ /// </remarks>\r
+ virtual bool reserve_message(runtime_object_identity _MsgId)\r
+ {\r
+ // Allow reservation if this is the head message\r
+ return _M_messageBuffer.is_head(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Consumes a message that was reserved previously.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// Similar to 'accept', but is always preceded by a call to 'reserve'.\r
+ /// </remarks>\r
+ virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
+ {\r
+ // By default, accept the message\r
+ return accept_message(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Releases a previous message reservation.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ virtual void release_message(runtime_object_identity _MsgId)\r
+ {\r
+ // The head message is the one reserved.\r
+ if (!_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ throw message_not_found();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Resumes propagation after a reservation has been released\r
+ /// </summary>\r
+ virtual void resume_propagation()\r
+ {\r
+ // If there are any messages in the buffer, propagate them out\r
+ if (_M_messageBuffer.count() > 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Notification that a target was linked to this source.\r
+ /// </summary>\r
+ /// <param name="_PTarget">\r
+ /// A pointer to the newly linked target.\r
+ /// </param>\r
+ virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
+ {\r
+ // If the message queue is blocked due to reservation\r
+ // there is no need to do any message propagation\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ message<_Type> * _Msg = _M_messageBuffer.peek();\r
+\r
+ if (_Msg != NULL)\r
+ {\r
+ // Propagate the head message to the new target\r
+ message_status _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ if (_Status == accepted)\r
+ {\r
+ // The target accepted the message, restart propagation.\r
+ propagate_to_any_targets(NULL);\r
+ }\r
+\r
+ // If the status is anything other than accepted, then leave\r
+ // the message queue blocked.\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Takes the message and propagates it to all the targets of this bounded_buffer.\r
+ /// This is called from async_send.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to a new message.\r
+ /// </param>\r
+ virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
+ {\r
+ // Enqueue pMessage to the internal message buffer if it is non-NULL.\r
+ // pMessage can be NULL if this LWT was the result of a Repropagate call\r
+ // out of a Consume or Release (where no new message is queued up, but\r
+ // everything remaining in the bounded buffer needs to be propagated out)\r
+ if (_PMessage != NULL)\r
+ {\r
+ _M_messageBuffer.enqueue(_PMessage);\r
+\r
+ // If the incoming pMessage is not the head message, we can safely assume that\r
+ // the head message is blocked and waiting on Consume(), Release() or a new\r
+ // link_target() and cannot be propagated out.\r
+ if (_M_messageBuffer.is_head(_PMessage->msg_id()))\r
+ {\r
+ _Propagate_priority_order();\r
+ }\r
+ }\r
+ else\r
+ {\r
+ // While current size is less than capacity try to consume\r
+ // any previously offered ids.\r
+ bool _ConsumedMsg = true;\r
+ while(_ConsumedMsg)\r
+ {\r
+ // Assume a message will be found to successfully consume in the\r
+ // saved ids, if not this will be decremented afterwards.\r
+ if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
+ {\r
+ break;\r
+ }\r
+\r
+ _ConsumedMsg = try_consume_msg();\r
+ }\r
+\r
+ // Decrement the current size, we broke out of the previous loop\r
+ // because we reached capacity or there were no more messages to consume.\r
+ _InterlockedDecrement(&_M_currentSize);\r
+ }\r
+ }\r
+\r
+ private:\r
+\r
+ /// <summary>\r
+ /// Attempts to propagate out any messages currently in the block.\r
+ /// </summary>\r
+ void _Propagate_priority_order()\r
+ {\r
+ message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
+\r
+ // If someone has reserved the _Head message, don't propagate anymore\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ while (_Msg != NULL)\r
+ {\r
+ message_status _Status = declined;\r
+\r
+ // Always start from the first target that linked.\r
+ for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
+ {\r
+ ITarget<_Target_type> * _PTarget = *_Iter;\r
+ _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ // Ownership of message changed. Do not propagate this\r
+ // message to any other target.\r
+ if (_Status == accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // If the target just propagated to reserved this message, stop\r
+ // propagating it to others.\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ break;\r
+ }\r
+ }\r
+\r
+ // If status is anything other than accepted, then the head message\r
+ // was not propagated out. Thus, nothing after it in the queue can\r
+ // be propagated out. Cease propagation.\r
+ if (_Status != accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // Get the next message\r
+ _Msg = _M_messageBuffer.peek();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Message buffer used to store messages.\r
+ /// </summary>\r
+ MessageQueue<_Type> _M_messageBuffer;\r
+\r
+ /// <summary>\r
+ /// Maximum number of messages bounded_buffer can hold.\r
+ /// </summary>\r
+ const size_t _M_capacity;\r
+\r
+ /// <summary>\r
+ /// Current number of messages in bounded_buffer.\r
+ /// </summary>\r
+ volatile long _M_currentSize;\r
+\r
+ /// <summary>\r
+ /// Lock used to guard saved message ids map.\r
+ /// </summary>\r
+ critical_section _M_savedIdsLock;\r
+\r
+ /// <summary>\r
+ /// Map of source links to saved message ids.\r
+ /// </summary>\r
+ std::map<ISource<_Type> *, runtime_object_identity> _M_savedSourceMsgIds;\r
+\r
+ //\r
+ // Hide assignment operator and copy constructor\r
+ //\r
+ bounded_buffer const &operator =(bounded_buffer const&); // no assignment operator\r
+ bounded_buffer(bounded_buffer const &); // no copy constructor\r
+ };\r
+\r
+ /// <summary>\r
+ /// A simple alternator, offers messages in order to each target\r
+ /// one at a time. If a consume occurs a message won't be offered to that target again\r
+ /// until all others are given a chance. This causes messages to be distributed more\r
+ /// evenly among targets.\r
+ /// </summary>\r
+ /// <typeparam name="_Type">\r
+ /// The payload type of messages stored and propagated by the buffer.\r
+ /// </typeparam>\r
+ template<class _Type>\r
+ class alternator : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
+ {\r
+ public:\r
+ /// <summary>\r
+ /// Create an alternator within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ alternator()\r
+ : _M_indexNextTarget(0)\r
+ {\r
+ initialize_source_and_target();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an alternator within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ alternator(filter_method const& _Filter)\r
+ : _M_indexNextTarget(0)\r
+ {\r
+ initialize_source_and_target();\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an alternator within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ alternator(Scheduler& _PScheduler)\r
+ : _M_indexNextTarget(0)\r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an alternator within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_PScheduler">\r
+ /// A reference to a scheduler instance.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ alternator(Scheduler& _PScheduler, filter_method const& _Filter) \r
+ : _M_indexNextTarget(0)\r
+ {\r
+ initialize_source_and_target(&_PScheduler);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an alternator within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ alternator(ScheduleGroup& _PScheduleGroup)\r
+ : _M_indexNextTarget(0)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Creates an alternator within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// A reference to a schedule group.\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A reference to a filter function.\r
+ /// </param>\r
+ alternator(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
+ : _M_indexNextTarget(0)\r
+ {\r
+ initialize_source_and_target(NULL, &_PScheduleGroup);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Cleans up any resources that may have been created by the alternator.\r
+ /// </summary>\r
+ ~alternator()\r
+ {\r
+ // Remove all links\r
+ remove_network_links();\r
+ }\r
+\r
+ protected:\r
+\r
+ /// <summary>\r
+ /// The main propagate() function for ITarget blocks. Called by a source\r
+ /// block, generally within an asynchronous task to send messages to its targets.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// It is important that calls to propagate do *not* take the same lock on the\r
+ /// internal structure that is used by Consume and the LWT. Doing so could\r
+ /// result in a deadlock with the Consume call. (in the case of the alternator,\r
+ /// this lock is the m_internalLock)\r
+ /// </remarks>\r
+ virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ message_status _Result = accepted;\r
+ //\r
+ // Accept the message being propagated\r
+ // Note: depending on the source block propagating the message\r
+ // this may not necessarily be the same message (pMessage) first\r
+ // passed into the function.\r
+ //\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ async_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ _Result = missed;\r
+ }\r
+\r
+ return _Result;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Synchronously sends a message to this block. When this function completes the message will\r
+ /// already have propagated into the block.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to the message.\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// A pointer to the source block offering the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
+ {\r
+ _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_PMessage != NULL)\r
+ {\r
+ sync_send(_PMessage);\r
+ }\r
+ else\r
+ {\r
+ return missed;\r
+ }\r
+\r
+ return accepted;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Accepts an offered message by the source, transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
+ {\r
+ //\r
+ // Peek at the head message in the message buffer. If the Ids match\r
+ // dequeue and transfer ownership\r
+ //\r
+ message<_Type> * _Msg = NULL;\r
+\r
+ if (_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ _Msg = _M_messageBuffer.dequeue();\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Reserves a message previously offered by the source.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A Boolean indicating whether the reservation worked or not.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// After 'reserve' is called, either 'consume' or 'release' must be called.\r
+ /// </remarks>\r
+ virtual bool reserve_message(runtime_object_identity _MsgId)\r
+ {\r
+ // Allow reservation if this is the head message\r
+ return _M_messageBuffer.is_head(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Consumes a message that was reserved previously.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// Similar to 'accept', but is always preceded by a call to 'reserve'.\r
+ /// </remarks>\r
+ virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
+ {\r
+ // Update so we don't offer to this target again until\r
+ // all others have a chance.\r
+ target_iterator _CurrentIter = _M_connectedTargets.begin();\r
+ for(size_t i = 0;*_CurrentIter != NULL; ++_CurrentIter, ++i) \r
+ {\r
+ if(*_CurrentIter == _M_pReservedFor)\r
+ {\r
+ _M_indexNextTarget = i + 1;\r
+ break;\r
+ }\r
+ }\r
+\r
+ // By default, accept the message\r
+ return accept_message(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Releases a previous message reservation.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ virtual void release_message(runtime_object_identity _MsgId)\r
+ {\r
+ // The head message is the one reserved.\r
+ if (!_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ throw message_not_found();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Resumes propagation after a reservation has been released.\r
+ /// </summary>\r
+ virtual void resume_propagation()\r
+ {\r
+ // If there are any messages in the buffer, propagate them out\r
+ if (_M_messageBuffer.count() > 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Notification that a target was linked to this source.\r
+ /// </summary>\r
+ /// <param name="_PTarget">\r
+ /// A pointer to the newly linked target.\r
+ /// </param>\r
+ virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
+ {\r
+ // If the message queue is blocked due to reservation\r
+ // there is no need to do any message propagation\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ message<_Type> * _Msg = _M_messageBuffer.peek();\r
+\r
+ if (_Msg != NULL)\r
+ {\r
+ // Propagate the head message to the new target\r
+ message_status _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ if (_Status == accepted)\r
+ {\r
+ // The target accepted the message, restart propagation.\r
+ propagate_to_any_targets(NULL);\r
+ }\r
+\r
+ // If the status is anything other than accepted, then leave\r
+ // the message queue blocked.\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Takes the message and propagates it to all the targets of this alternator.\r
+ /// This is called from async_send.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// A pointer to a new message.\r
+ /// </param>\r
+ virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
+ {\r
+ // Enqueue pMessage to the internal buffer queue if it is non-NULL.\r
+ // pMessage can be NULL if this LWT was the result of a Repropagate call\r
+ // out of a Consume or Release (where no new message is queued up, but\r
+ // everything remaining in the unbounded buffer needs to be propagated out)\r
+ if (_PMessage != NULL)\r
+ {\r
+ _M_messageBuffer.enqueue(_PMessage);\r
+\r
+ // If the incoming pMessage is not the head message, we can safely assume that\r
+ // the head message is blocked and waiting on Consume(), Release() or a new\r
+ // link_target()\r
+ if (!_M_messageBuffer.is_head(_PMessage->msg_id()))\r
+ {\r
+ return;\r
+ }\r
+ }\r
+\r
+ // Attempt to propagate messages to targets in order last left off.\r
+ _Propagate_alternating_order();\r
+ }\r
+\r
+ /// <summary>\r
+ /// Offers messages to targets in alternating order to help distribute messages\r
+ /// evenly among targets.\r
+ /// </summary>\r
+ void _Propagate_alternating_order()\r
+ {\r
+ message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
+\r
+ // If someone has reserved the _Head message, don't propagate anymore\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ //\r
+ // Try to start where left off before, if the link has been removed\r
+ // or this is the first time then start at the beginning.\r
+ //\r
+ target_iterator _CurrentIter = _M_connectedTargets.begin();\r
+ const target_iterator _FirstLinkIter(_CurrentIter);\r
+ for(size_t i = 0;*_CurrentIter != NULL && i < _M_indexNextTarget; ++_CurrentIter, ++i) {}\r
+\r
+ while (_Msg != NULL)\r
+ {\r
+ message_status _Status = declined;\r
+\r
+ // Loop offering message until end of links is reached.\r
+ target_iterator _StartedIter(_CurrentIter);\r
+ for(;*_CurrentIter != NULL; ++_CurrentIter)\r
+ {\r
+ _Status = (*_CurrentIter)->propagate(_Msg, this);\r
+ ++_M_indexNextTarget;\r
+\r
+ // Ownership of message changed. Do not propagate this\r
+ // message to any other target.\r
+ if (_Status == accepted)\r
+ {\r
+ ++_CurrentIter;\r
+ break;\r
+ }\r
+\r
+ // If the target just propagated to reserved this message, stop\r
+ // propagating it to others\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+ }\r
+\r
+ // Message ownership changed go to next messages.\r
+ if (_Status == accepted)\r
+ {\r
+ continue;\r
+ }\r
+\r
+ // Try starting from the beginning until the first link offering was started at.\r
+ _M_indexNextTarget = 0;\r
+ for(_CurrentIter = _FirstLinkIter;*_CurrentIter != NULL; ++_CurrentIter)\r
+ {\r
+ // I have offered the same message to all links now so stop.\r
+ if(*_CurrentIter == *_StartedIter)\r
+ {\r
+ break;\r
+ }\r
+\r
+ _Status = (*_CurrentIter)->propagate(_Msg, this);\r
+ ++_M_indexNextTarget;\r
+\r
+ // Ownership of message changed. Do not propagate this\r
+ // message to any other target.\r
+ if (_Status == accepted)\r
+ {\r
+ ++_CurrentIter;\r
+ break;\r
+ }\r
+\r
+ // If the target just propagated to reserved this message, stop\r
+ // propagating it to others\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+ }\r
+\r
+ // If status is anything other than accepted, then the head message\r
+ // was not propagated out. Thus, nothing after it in the queue can\r
+ // be propagated out. Cease propagation.\r
+ if (_Status != accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // Get the next message\r
+ _Msg = _M_messageBuffer.peek();\r
+ }\r
+ }\r
+\r
+ private:\r
+\r
+ /// <summary>\r
+ /// Message queue used to store messages.\r
+ /// </summary>\r
+ MessageQueue<_Type> _M_messageBuffer;\r
+\r
+ /// <summary>\r
+ /// Index of next target to call propagate on. Used to alternate and load\r
+ /// balance message offering.\r
+ /// </summary>\r
+ size_t _M_indexNextTarget;\r
+\r
+ //\r
+ // Hide assignment operator and copy constructor.\r
+ //\r
+ alternator const &operator =(alternator const&); // no assignment operator\r
+ alternator(alternator const &); // no copy constructor\r
+ };\r
+\r
+ #include <agents.h>\r
+ \r
+ //\r
+ // Sample block that combines join and transform.\r
+ //\r
+ template<class _Input, class _Output, join_type _Jtype = non_greedy>\r
+ class join_transform : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>\r
+ {\r
+ public:\r
+\r
+ typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;\r
+\r
+ /// <summary>\r
+ /// Create an join block within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs this join will be allowed\r
+ /// </param>\r
+ join_transform(size_t _NumInputs, _Transform_method const& _Func)\r
+ : _M_messageArray(_NumInputs, 0),\r
+ _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an join block within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs this join will be allowed\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A filter method placed on this join\r
+ /// </param>\r
+ join_transform(size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
+ : _M_messageArray(_NumInputs, 0),\r
+ _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an join block within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Scheduler">\r
+ /// The scheduler onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs this join will be allowed\r
+ /// </param>\r
+ join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func)\r
+ : _M_messageArray(_NumInputs, 0),\r
+ _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, &_PScheduler);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an join block within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Scheduler">\r
+ /// The scheduler onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs this join will be allowed\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A filter method placed on this join\r
+ /// </param>\r
+ join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
+ : _M_messageArray(_NumInputs, 0),\r
+ _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, &_PScheduler);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an join block within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// The ScheduleGroup onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs this join will be allowed\r
+ /// </param>\r
+ join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func)\r
+ : _M_messageArray(_NumInputs, 0),\r
+ _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an join block within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// The ScheduleGroup onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs this join will be allowed\r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A filter method placed on this join\r
+ /// </param>\r
+ join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
+ : _M_messageArray(_NumInputs, 0),\r
+ _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Destroys a join\r
+ /// </summary>\r
+ ~join_transform()\r
+ {\r
+ // Remove all links that are targets of this join\r
+ remove_network_links();\r
+\r
+ delete [] _M_savedIdBuffer;\r
+ }\r
+\r
+ protected:\r
+ //\r
+ // propagator_block protected function implementations\r
+ //\r
+\r
+ /// <summary>\r
+ /// The main propagate() function for ITarget blocks. Called by a source\r
+ /// block, generally within an asynchronous task to send messages to its targets.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// The message being propagated\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// The source doing the propagation\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// It is important that calls to propagate do *not* take the same lock on the\r
+ /// internal structure that is used by Consume and the LWT. Doing so could\r
+ /// result in a deadlock with the Consume call. \r
+ /// </remarks>\r
+ message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) \r
+ {\r
+ message_status _Ret_val = accepted;\r
+\r
+ //\r
+ // Find the slot index of this source\r
+ //\r
+ size_t _Slot = 0;\r
+ bool _Found = false;\r
+ for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)\r
+ {\r
+ if (*_Iter == _PSource)\r
+ {\r
+ _Found = true;\r
+ break;\r
+ }\r
+\r
+ _Slot++;\r
+ }\r
+\r
+ if (!_Found)\r
+ {\r
+ // If this source was not found in the array, this is not a connected source\r
+ // decline the message\r
+ return declined;\r
+ }\r
+\r
+ _ASSERTE(_Slot < _M_messageArray.size());\r
+\r
+ bool fIsGreedy = (_Jtype == greedy);\r
+\r
+ if (fIsGreedy)\r
+ {\r
+ //\r
+ // Greedy type joins immediately accept the message.\r
+ //\r
+ {\r
+ critical_section::scoped_lock lockHolder(_M_propagationLock);\r
+ if (_M_messageArray[_Slot] != NULL)\r
+ {\r
+ _M_savedMessageIdArray[_Slot] = _PMessage->msg_id();\r
+ _Ret_val = postponed;\r
+ }\r
+ }\r
+\r
+ if (_Ret_val != postponed)\r
+ {\r
+ _M_messageArray[_Slot] = _PSource->accept(_PMessage->msg_id(), this);\r
+\r
+ if (_M_messageArray[_Slot] != NULL)\r
+ {\r
+ if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
+ {\r
+ // If messages have arrived on all links, start a propagation\r
+ // of the current message\r
+ async_send(NULL);\r
+ }\r
+ }\r
+ else\r
+ {\r
+ _Ret_val = missed;\r
+ }\r
+ }\r
+ }\r
+ else\r
+ {\r
+ //\r
+ // Non-greedy type joins save the message ids until they have all arrived\r
+ //\r
+\r
+ if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)\r
+ {\r
+ // Decrement the message remaining count if this thread is switching \r
+ // the saved id from -1 to a valid value.\r
+ if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ // Always return postponed. This message will be consumed\r
+ // in the LWT\r
+ _Ret_val = postponed;\r
+ }\r
+\r
+ return _Ret_val;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Accepts an offered message by the source, transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ virtual message<_Output> * accept_message(runtime_object_identity _MsgId)\r
+ {\r
+ //\r
+ // Peek at the head message in the message buffer. If the Ids match\r
+ // dequeue and transfer ownership\r
+ //\r
+ message<_Output> * _Msg = NULL;\r
+\r
+ if (_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ _Msg = _M_messageBuffer.dequeue();\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Reserves a message previously offered by the source.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A bool indicating whether the reservation worked or not\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// After 'reserve' is called, either 'consume' or 'release' must be called.\r
+ /// </remarks>\r
+ virtual bool reserve_message(runtime_object_identity _MsgId)\r
+ {\r
+ // Allow reservation if this is the head message\r
+ return _M_messageBuffer.is_head(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Consumes a message previously offered by the source and reserved by the target, \r
+ /// transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// Similar to 'accept', but is always preceded by a call to 'reserve'\r
+ /// </remarks>\r
+ virtual message<_Output> * consume_message(runtime_object_identity _MsgId)\r
+ {\r
+ // By default, accept the message\r
+ return accept_message(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Releases a previous message reservation.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ virtual void release_message(runtime_object_identity _MsgId)\r
+ {\r
+ // The head message is the one reserved.\r
+ if (!_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ throw message_not_found();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Resumes propagation after a reservation has been released\r
+ /// </summary>\r
+ virtual void resume_propagation()\r
+ {\r
+ // If there are any messages in the buffer, propagate them out\r
+ if (_M_messageBuffer.count() > 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Notification that a target was linked to this source.\r
+ /// </summary>\r
+ /// <param name="_PTarget">\r
+ /// A pointer to the newly linked target.\r
+ /// </param>\r
+ virtual void link_target_notification(ITarget<_Output> *)\r
+ {\r
+ // If the message queue is blocked due to reservation\r
+ // there is no need to do any message propagation\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ _Propagate_priority_order(_M_messageBuffer);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Takes the message and propagates it to all the target of this join.\r
+ /// This is called from async_send.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// The message being propagated\r
+ /// </param>\r
+ void propagate_to_any_targets(message<_Output> *) \r
+ {\r
+ message<_Output> * _Msg = NULL;\r
+ // Create a new message from the input sources\r
+ // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from\r
+ // a consume or release from the target. In that case we don't want to create a new message.\r
+ if (_M_messagesRemaining == 0)\r
+ {\r
+ // A greedy join can immediately create the message, a non-greedy\r
+ // join must try and consume all the messages it has postponed\r
+ _Msg = _Create_new_message();\r
+ }\r
+\r
+ if (_Msg == NULL)\r
+ {\r
+ // Create message failed. This happens in non_greedy joins when the\r
+ // reserve/consumption of a postponed message failed.\r
+ _Propagate_priority_order(_M_messageBuffer);\r
+ return;\r
+ }\r
+\r
+ bool fIsGreedy = (_Jtype == greedy);\r
+\r
+ // For a greedy join, reset the number of messages remaining\r
+ // Check to see if multiple messages have been passed in on any of the links,\r
+ // and postponed. If so, try and reserve/consume them now\r
+ if (fIsGreedy)\r
+ {\r
+ // Look at the saved ids and reserve/consume any that have passed in while\r
+ // this join was waiting to complete\r
+ _ASSERTE(_M_messageArray.size() == _M_savedMessageIdArray.size());\r
+\r
+ for (size_t i = 0; i < _M_messageArray.size(); i++)\r
+ {\r
+ for(;;)\r
+ {\r
+ runtime_object_identity _Saved_id;\r
+ // Grab the current saved id value. This value could be changing from based on any\r
+ // calls of source->propagate(this). If the message id is different than what is snapped\r
+ // here, that means, the reserve below must fail. This is because reserve is trying\r
+ // to get the same source lock the propagate(this) call must be holding.\r
+ {\r
+ critical_section::scoped_lock lockHolder(_M_propagationLock);\r
+\r
+ _ASSERTE(_M_messageArray[i] != NULL);\r
+\r
+ _Saved_id = _M_savedMessageIdArray[i];\r
+\r
+ if (_Saved_id == -1)\r
+ {\r
+ _M_messageArray[i] = NULL;\r
+ break;\r
+ }\r
+ else\r
+ {\r
+ _M_savedMessageIdArray[i] = -1;\r
+ }\r
+ }\r
+\r
+ if (_Saved_id != -1)\r
+ {\r
+ source_iterator _Iter = _M_connectedSources.begin();\r
+ \r
+ ISource<_Input> * _PSource = _Iter[i];\r
+ if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))\r
+ {\r
+ _M_messageArray[i] = _PSource->consume(_Saved_id, this);\r
+ _InterlockedDecrementSizeT(&_M_messagesRemaining);\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ // If messages have all been received, async_send again, this will start the\r
+ // LWT up to create a new message\r
+ if (_M_messagesRemaining == 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+ \r
+ // Add the new message to the outbound queue\r
+ _M_messageBuffer.enqueue(_Msg);\r
+\r
+ if (!_M_messageBuffer.is_head(_Msg->msg_id()))\r
+ {\r
+ // another message is at the head of the outbound message queue and blocked\r
+ // simply return\r
+ return;\r
+ }\r
+\r
+ _Propagate_priority_order(_M_messageBuffer);\r
+ }\r
+\r
+ private:\r
+\r
+ //\r
+ // Private Methods\r
+ //\r
+\r
+ /// <summary>\r
+ /// Propagate messages in priority order\r
+ /// </summary>\r
+ /// <param name="_MessageBuffer">\r
+ /// Reference to a message queue with messages to be propagated\r
+ /// </param>\r
+ void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)\r
+ {\r
+ message<_Output> * _Msg = _MessageBuffer.peek();\r
+\r
+ // If someone has reserved the _Head message, don't propagate anymore\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ while (_Msg != NULL)\r
+ {\r
+ message_status _Status = declined;\r
+\r
+ // Always start from the first target that linked\r
+ for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
+ {\r
+ ITarget<_Output> * _PTarget = *_Iter;\r
+ _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ // Ownership of message changed. Do not propagate this\r
+ // message to any other target.\r
+ if (_Status == accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // If the target just propagated to reserved this message, stop\r
+ // propagating it to others\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ break;\r
+ }\r
+ }\r
+\r
+ // If status is anything other than accepted, then the head message\r
+ // was not propagated out. Thus, nothing after it in the queue can\r
+ // be propagated out. Cease propagation.\r
+ if (_Status != accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // Get the next message\r
+ _Msg = _MessageBuffer.peek();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create a new message from the data output\r
+ /// </summary>\r
+ /// <returns>\r
+ /// The created message (NULL if creation failed)\r
+ /// </returns>\r
+ message<_Output> * __cdecl _Create_new_message()\r
+ {\r
+ bool fIsNonGreedy = (_Jtype == non_greedy);\r
+\r
+ // If this is a non-greedy join, check each source and try to consume their message\r
+ if (fIsNonGreedy)\r
+ {\r
+\r
+ // The iterator _Iter below will ensure that it is safe to touch\r
+ // non-NULL source pointers. Take a snapshot.\r
+ std::vector<ISource<_Input> *> _Sources;\r
+ source_iterator _Iter = _M_connectedSources.begin();\r
+\r
+ while (*_Iter != NULL)\r
+ {\r
+ ISource<_Input> * _PSource = *_Iter;\r
+\r
+ if (_PSource == NULL)\r
+ {\r
+ break;\r
+ }\r
+\r
+ _Sources.push_back(_PSource);\r
+ ++_Iter;\r
+ }\r
+\r
+ if (_Sources.size() != _M_messageArray.size())\r
+ {\r
+ // Some of the sources were unlinked. The join is broken\r
+ return NULL;\r
+ }\r
+\r
+ // First, try and reserve all the messages. If a reservation fails,\r
+ // then release any reservations that had been made.\r
+ for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)\r
+ {\r
+ // Snap the current saved id into a buffer. This value can be changing behind the scenes from\r
+ // other source->propagate(msg, this) calls, but if so, that just means the reserve below will\r
+ // fail.\r
+ _InterlockedIncrementSizeT(&_M_messagesRemaining);\r
+ _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);\r
+\r
+ _ASSERTE(_M_savedIdBuffer[i] != -1);\r
+\r
+ if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))\r
+ {\r
+ // A reservation failed, release all reservations made up until\r
+ // this block, and wait for another message to arrive on this link\r
+ for (size_t j = 0; j < i; j++)\r
+ {\r
+ _Sources[j]->release(_M_savedIdBuffer[j], this);\r
+ if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1) == -1)\r
+ {\r
+ if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+ }\r
+\r
+ // Return NULL to indicate that the create failed\r
+ return NULL;\r
+ } \r
+ }\r
+\r
+ // Since everything has been reserved, consume all the messages.\r
+ // This is guaranteed to return true.\r
+ for (size_t i = 0; i < _M_messageArray.size(); i++)\r
+ {\r
+ _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);\r
+ _M_savedIdBuffer[i] = -1;\r
+ }\r
+ }\r
+\r
+ if (!fIsNonGreedy)\r
+ {\r
+ // Reinitialize how many messages are being waited for.\r
+ // This is safe because all messages have been received, thus no new async_sends for\r
+ // greedy joins can be called.\r
+ _M_messagesRemaining = _M_messageArray.size();\r
+ }\r
+\r
+ std::vector<_Input> _OutputVector;\r
+ for (size_t i = 0; i < _M_messageArray.size(); i++)\r
+ {\r
+ _ASSERTE(_M_messageArray[i] != NULL);\r
+ _OutputVector.push_back(_M_messageArray[i]->payload);\r
+\r
+ delete _M_messageArray[i];\r
+ if (fIsNonGreedy)\r
+ {\r
+ _M_messageArray[i] = NULL;\r
+ }\r
+ }\r
+\r
+ _Output _Out = _M_pFunc(_OutputVector);\r
+\r
+ return (new message<_Output>(_Out));\r
+ }\r
+\r
+ /// <summary>\r
+ /// Initialize the join block\r
+ /// </summary>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs\r
+ /// </param>\r
+ void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)\r
+ {\r
+ initialize_source_and_target(_PScheduler, _PScheduleGroup);\r
+\r
+ _M_connectedSources.set_bound(_NumInputs);\r
+ _M_messagesRemaining = _NumInputs;\r
+\r
+ bool fIsNonGreedy = (_Jtype == non_greedy);\r
+\r
+ if (fIsNonGreedy)\r
+ {\r
+ // Non greedy joins need a buffer to snap off saved message ids to.\r
+ _M_savedIdBuffer = new runtime_object_identity[_NumInputs];\r
+ memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);\r
+ }\r
+ else\r
+ {\r
+ _M_savedIdBuffer = NULL;\r
+ }\r
+ }\r
+\r
+ // The current number of messages remaining\r
+ volatile size_t _M_messagesRemaining;\r
+\r
+ // An array containing the accepted messages of this join.\r
+ std::vector<message<_Input>*> _M_messageArray;\r
+\r
+ // An array containing the msg ids of messages propagated to the array\r
+ // For greedy joins, this contains a log of other messages passed to this\r
+ // join after the first has been accepted\r
+ // For non-greedy joins, this contains the message id of any message \r
+ // passed to it.\r
+ std::vector<runtime_object_identity> _M_savedMessageIdArray;\r
+\r
+ // The transformer method called by this block\r
+ _Transform_method _M_pFunc;\r
+\r
+ // Buffer for snapping saved ids in non-greedy joins\r
+ runtime_object_identity * _M_savedIdBuffer;\r
+\r
+ // A lock for modifying the buffer or the connected blocks\r
+ ::Concurrency::critical_section _M_propagationLock;\r
+\r
+ // Queue to hold output messages\r
+ MessageQueue<_Output> _M_messageBuffer;\r
+ };\r
+\r
+ //\r
+ // Message block that invokes a transform method when it receives message on any of the input links.\r
+ // A typical example is recal engine for a cell in an Excel spreadsheet.\r
+ // (Remember that a normal join block is triggered only when it receives messages on all its input links).\r
+ //\r
+ template<class _Input, class _Output>\r
+ class recalculate : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>\r
+ {\r
+ public:\r
+ typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Recalculate_method;\r
+\r
+ /// <summary>\r
+ /// Create an recalculate block within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs \r
+ /// </param>\r
+ recalculate(size_t _NumInputs, _Recalculate_method const& _Func)\r
+ : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an recalculate block within the default scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs \r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A filter method placed on this join\r
+ /// </param>\r
+ recalculate(size_t _NumInputs, _Recalculate_method const& _Func, filter_method const& _Filter)\r
+ : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an recalculate block within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Scheduler">\r
+ /// The scheduler onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs \r
+ /// </param>\r
+ recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func)\r
+ : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, &_PScheduler);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an recalculate block within the specified scheduler, and places it any schedule\r
+ /// group of the scheduler\92s choosing.\r
+ /// </summary>\r
+ /// <param name="_Scheduler">\r
+ /// The scheduler onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs \r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A filter method placed on this join\r
+ /// </param>\r
+ recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func, filter_method const& _Filter)\r
+ : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, &_PScheduler);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an recalculate block within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// The ScheduleGroup onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs \r
+ /// </param>\r
+ recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func)\r
+ : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create an recalculate block within the specified schedule group. The scheduler is implied\r
+ /// by the schedule group.\r
+ /// </summary>\r
+ /// <param name="_PScheduleGroup">\r
+ /// The ScheduleGroup onto which the task's message propagation will be scheduled.\r
+ /// </param>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs \r
+ /// </param>\r
+ /// <param name="_Filter">\r
+ /// A filter method placed on this join\r
+ /// </param>\r
+ recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func, filter_method const& _Filter)\r
+ : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
+ _M_pFunc(_Func)\r
+ {\r
+ _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
+ register_filter(_Filter);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Destroys a join\r
+ /// </summary>\r
+ ~recalculate()\r
+ {\r
+ // Remove all links that are targets of this join\r
+ remove_network_links();\r
+\r
+ delete [] _M_savedIdBuffer;\r
+ }\r
+\r
+ protected:\r
+ //\r
+ // propagator_block protected function implementations\r
+ //\r
+\r
+ /// <summary>\r
+ /// The main propagate() function for ITarget blocks. Called by a source\r
+ /// block, generally within an asynchronous task to send messages to its targets.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// The message being propagated\r
+ /// </param>\r
+ /// <param name="_PSource">\r
+ /// The source doing the propagation\r
+ /// </param>\r
+ /// <returns>\r
+ /// An indication of what the target decided to do with the message.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// It is important that calls to propagate do *not* take the same lock on the\r
+ /// internal structure that is used by Consume and the LWT. Doing so could\r
+ /// result in a deadlock with the Consume call. \r
+ /// </remarks>\r
+ message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) \r
+ {\r
+ //\r
+ // Find the slot index of this source\r
+ //\r
+ size_t _Slot = 0;\r
+ bool _Found = false;\r
+ for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)\r
+ {\r
+ if (*_Iter == _PSource)\r
+ {\r
+ _Found = true;\r
+ break;\r
+ }\r
+\r
+ _Slot++;\r
+ }\r
+\r
+ if (!_Found)\r
+ {\r
+ // If this source was not found in the array, this is not a connected source\r
+ // decline the message\r
+ return declined;\r
+ }\r
+\r
+ //\r
+ // Save the message id\r
+ //\r
+ if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)\r
+ {\r
+ // If it is not seen by Create_message attempt a recalculate\r
+ async_send(NULL);\r
+ }\r
+\r
+ // Always return postponed. This message will be consumed\r
+ // in the LWT\r
+ return postponed;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Accepts an offered message by the source, transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ virtual message<_Output> * accept_message(runtime_object_identity _MsgId)\r
+ {\r
+ //\r
+ // Peek at the head message in the message buffer. If the Ids match\r
+ // dequeue and transfer ownership\r
+ //\r
+ message<_Output> * _Msg = NULL;\r
+\r
+ if (_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ _Msg = _M_messageBuffer.dequeue();\r
+ }\r
+\r
+ return _Msg;\r
+ }\r
+\r
+ /// <summary>\r
+ /// Reserves a message previously offered by the source.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A bool indicating whether the reservation worked or not\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// After 'reserve' is called, either 'consume' or 'release' must be called.\r
+ /// </remarks>\r
+ virtual bool reserve_message(runtime_object_identity _MsgId)\r
+ {\r
+ // Allow reservation if this is the head message\r
+ return _M_messageBuffer.is_head(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Consumes a message previously offered by the source and reserved by the target, \r
+ /// transferring ownership to the caller.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ /// <returns>\r
+ /// A pointer to the message that the caller now has ownership of.\r
+ /// </returns>\r
+ /// <remarks>\r
+ /// Similar to 'accept', but is always preceded by a call to 'reserve'\r
+ /// </remarks>\r
+ virtual message<_Output> * consume_message(runtime_object_identity _MsgId)\r
+ {\r
+ // By default, accept the message\r
+ return accept_message(_MsgId);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Releases a previous message reservation.\r
+ /// </summary>\r
+ /// <param name="_MsgId">\r
+ /// The runtime object identity of the message.\r
+ /// </param>\r
+ virtual void release_message(runtime_object_identity _MsgId)\r
+ {\r
+ // The head message is the one reserved.\r
+ if (!_M_messageBuffer.is_head(_MsgId))\r
+ {\r
+ throw message_not_found();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Resumes propagation after a reservation has been released\r
+ /// </summary>\r
+ virtual void resume_propagation()\r
+ {\r
+ // If there are any messages in the buffer, propagate them out\r
+ if (_M_messageBuffer.count() > 0)\r
+ {\r
+ async_send(NULL);\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Notification that a target was linked to this source.\r
+ /// </summary>\r
+ /// <param name="_PTarget">\r
+ /// A pointer to the newly linked target.\r
+ /// </param>\r
+ virtual void link_target_notification(ITarget<_Output> *)\r
+ {\r
+ // If the message queue is blocked due to reservation\r
+ // there is no need to do any message propagation\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ _Propagate_priority_order(_M_messageBuffer);\r
+ }\r
+\r
+ /// <summary>\r
+ /// Takes the message and propagates it to all the target of this join.\r
+ /// This is called from async_send.\r
+ /// </summary>\r
+ /// <param name="_PMessage">\r
+ /// The message being propagated\r
+ /// </param>\r
+ void propagate_to_any_targets(message<_Output> *) \r
+ {\r
+ // Attempt to create a new message\r
+ message<_Output> * _Msg = _Create_new_message();\r
+\r
+ if (_Msg != NULL)\r
+ {\r
+ // Add the new message to the outbound queue\r
+ _M_messageBuffer.enqueue(_Msg);\r
+\r
+ if (!_M_messageBuffer.is_head(_Msg->msg_id()))\r
+ {\r
+ // another message is at the head of the outbound message queue and blocked\r
+ // simply return\r
+ return;\r
+ }\r
+ }\r
+\r
+ _Propagate_priority_order(_M_messageBuffer);\r
+ }\r
+\r
+ private:\r
+\r
+ //\r
+ // Private Methods\r
+ //\r
+\r
+ /// <summary>\r
+ /// Propagate messages in priority order\r
+ /// </summary>\r
+ /// <param name="_MessageBuffer">\r
+ /// Reference to a message queue with messages to be propagated\r
+ /// </param>\r
+ void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)\r
+ {\r
+ message<_Output> * _Msg = _MessageBuffer.peek();\r
+\r
+ // If someone has reserved the _Head message, don't propagate anymore\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ return;\r
+ }\r
+\r
+ while (_Msg != NULL)\r
+ {\r
+ message_status _Status = declined;\r
+\r
+ // Always start from the first target that linked\r
+ for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
+ {\r
+ ITarget<_Output> * _PTarget = *_Iter;\r
+ _Status = _PTarget->propagate(_Msg, this);\r
+\r
+ // Ownership of message changed. Do not propagate this\r
+ // message to any other target.\r
+ if (_Status == accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // If the target just propagated to reserved this message, stop\r
+ // propagating it to others\r
+ if (_M_pReservedFor != NULL)\r
+ {\r
+ break;\r
+ }\r
+ }\r
+\r
+ // If status is anything other than accepted, then the head message\r
+ // was not propagated out. Thus, nothing after it in the queue can\r
+ // be propagated out. Cease propagation.\r
+ if (_Status != accepted)\r
+ {\r
+ break;\r
+ }\r
+\r
+ // Get the next message\r
+ _Msg = _MessageBuffer.peek();\r
+ }\r
+ }\r
+\r
+ /// <summary>\r
+ /// Create a new message from the data output\r
+ /// </summary>\r
+ /// <returns>\r
+ /// The created message (NULL if creation failed)\r
+ /// </returns>\r
+ message<_Output> * __cdecl _Create_new_message()\r
+ {\r
+ // If this is a non-greedy join, check each source and try to consume their message\r
+ size_t _NumInputs = _M_savedMessageIdArray.size();\r
+\r
+ // The iterator _Iter below will ensure that it is safe to touch\r
+ // non-NULL source pointers. Take a snapshot.\r
+ std::vector<ISource<_Input> *> _Sources;\r
+ source_iterator _Iter = _M_connectedSources.begin();\r
+\r
+ while (*_Iter != NULL)\r
+ {\r
+ ISource<_Input> * _PSource = *_Iter;\r
+\r
+ if (_PSource == NULL)\r
+ {\r
+ break;\r
+ }\r
+\r
+ _Sources.push_back(_PSource);\r
+ ++_Iter;\r
+ }\r
+\r
+ if (_Sources.size() != _NumInputs)\r
+ {\r
+ // Some of the sources were unlinked. The join is broken\r
+ return NULL;\r
+ }\r
+\r
+ // First, try and reserve all the messages. If a reservation fails,\r
+ // then release any reservations that had been made.\r
+ for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)\r
+ {\r
+ // Swap the id to -1 indicating that we have used that value for a recalculate\r
+ _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);\r
+\r
+ // If the id is -1, either we have never received a message on that link or the previous message is stored\r
+ // in the message array. If it is the former we abort. \r
+ // If the id is not -1, we attempt to reserve the message. On failure we abort.\r
+ if (((_M_savedIdBuffer[i] == -1) && (_M_messageArray[i] == NULL))\r
+ || ((_M_savedIdBuffer[i] != -1) && !_Sources[i]->reserve(_M_savedIdBuffer[i], this)))\r
+ {\r
+ // Abort. Release all reservations made up until this block, \r
+ // and wait for another message to arrive.\r
+ for (size_t j = 0; j < i; j++)\r
+ {\r
+ if (_M_savedIdBuffer[j] != -1)\r
+ {\r
+ _Sources[j]->release(_M_savedIdBuffer[j], this);\r
+ _InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1);\r
+ }\r
+ }\r
+\r
+ // Return NULL to indicate that the create failed\r
+ return NULL;\r
+ } \r
+ }\r
+\r
+ // Since everything has been reserved, consume all the messages.\r
+ // This is guaranteed to return true.\r
+ size_t _NewMessages = 0;\r
+ for (size_t i = 0; i < _NumInputs; i++)\r
+ {\r
+ if (_M_savedIdBuffer[i] != -1)\r
+ {\r
+ // Delete previous message since we have a new one\r
+ if (_M_messageArray[i] != NULL)\r
+ {\r
+ delete _M_messageArray[i];\r
+ }\r
+ _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);\r
+ _M_savedIdBuffer[i] = -1;\r
+ _NewMessages++;\r
+ }\r
+ }\r
+\r
+ if (_NewMessages == 0)\r
+ {\r
+ // There is no need to recal if we did not consume a new message\r
+ return NULL;\r
+ }\r
+\r
+ std::vector<_Input> _OutputVector;\r
+ for (size_t i = 0; i < _NumInputs; i++)\r
+ {\r
+ _ASSERTE(_M_messageArray[i] != NULL);\r
+ _OutputVector.push_back(_M_messageArray[i]->payload);\r
+ }\r
+\r
+ _Output _Out = _M_pFunc(_OutputVector);\r
+ return (new message<_Output>(_Out));\r
+ }\r
+\r
+ /// <summary>\r
+ /// Initialize the recalculate block\r
+ /// </summary>\r
+ /// <param name="_NumInputs">\r
+ /// The number of inputs\r
+ /// </param>\r
+ void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)\r
+ {\r
+ initialize_source_and_target(_PScheduler, _PScheduleGroup);\r
+\r
+ _M_connectedSources.set_bound(_NumInputs);\r
+\r
+ // Non greedy joins need a buffer to snap off saved message ids to.\r
+ _M_savedIdBuffer = new runtime_object_identity[_NumInputs];\r
+ memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);\r
+ }\r
+\r
+ // An array containing the accepted messages of this join.\r
+ std::vector<message<_Input>*> _M_messageArray;\r
+\r
+ // An array containing the msg ids of messages propagated to the array\r
+ // For non-greedy joins, this contains the message id of any message \r
+ // passed to it.\r
+ std::vector<runtime_object_identity> _M_savedMessageIdArray;\r
+\r
+ // Buffer for snapping saved ids in non-greedy joins\r
+ runtime_object_identity * _M_savedIdBuffer;\r
+\r
+ // The transformer method called by this block\r
+ _Recalculate_method _M_pFunc;\r
+\r
+ // Queue to hold output messages\r
+ MessageQueue<_Output> _M_messageBuffer;\r
+ };\r
+\r
+ //\r
+ // Container class to hold a join_transform block and keep unbounded buffers in front of each input.\r
+ //\r
+ template<class _Input, class _Output>\r
+ class buffered_join\r
+ {\r
+ typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;\r
+\r
+ public:\r
+ buffered_join(int _NumInputs, _Transform_method const& _Func): m_currentInput(0), m_numInputs(_NumInputs)\r
+ {\r
+ m_buffers = new unbounded_buffer<_Input>*[_NumInputs];\r
+ m_join = new join_transform<_Input,_Output,greedy>(_NumInputs, _Func);\r
+\r
+ for(int i = 0; i < _NumInputs; i++)\r
+ {\r
+ m_buffers[i] = new unbounded_buffer<_Input>;\r
+ m_buffers[i]->link_target(m_join);\r
+ }\r
+ }\r
+\r
+ ~buffered_join()\r
+ {\r
+ for(int i = 0; i < m_numInputs; i++)\r
+ delete m_buffers[i];\r
+ delete [] m_buffers;\r
+ delete m_join;\r
+ }\r
+\r
+ // Add input takes _PSource and connects it to the next input on this block\r
+ void add_input(ISource<_Input> * _PSource)\r
+ {\r
+ _PSource->link_target(m_buffers[m_currentInput]);\r
+ m_currentInput++;\r
+ }\r
+\r
+ // link_target links this container block to _PTarget\r
+ void link_target(ITarget<_Output> * _PTarget)\r
+ {\r
+ m_join->link_target(_PTarget);\r
+ }\r
+ private:\r
+\r
+ int m_currentInput;\r
+ int m_numInputs;\r
+ unbounded_buffer<_Input> ** m_buffers;\r
+ join_transform<_Input,_Output,greedy> * m_join;\r
+ };\r
+} // namespace Concurrency\r