1 //--------------------------------------------------------------------------
\r
3 // Copyright (c) Microsoft Corporation. All rights reserved.
\r
5 // File: agents_extras.h
\r
7 // Implementation of various useful message blocks
\r
9 //--------------------------------------------------------------------------
\r
15 // bounded_buffer uses a map
\r
19 namespace Concurrency
\r
22 /// Simple queue class for storing messages.
\r
24 /// <typeparam name="_Type">
\r
25 /// The payload type of messages stored in this queue.
\r
27 template <class _Type>
\r
31 typedef message<_Type> _Message;
\r
34 /// Constructs an initially empty queue.
\r
41 /// Removes and deletes any messages remaining in the queue.
\r
45 _Message * _Msg = dequeue();
\r
46 while (_Msg != NULL)
\r
54 /// Add an item to the queue.
\r
56 /// <param name="_Msg">
\r
59 void enqueue(_Message *_Msg)
\r
61 _M_queue.push(_Msg);
\r
65 /// Dequeue an item from the head of queue.
\r
68 /// Returns a pointer to the message found at the head of the queue.
\r
70 _Message * dequeue()
\r
72 _Message * _Msg = NULL;
\r
74 if (!_M_queue.empty())
\r
76 _Msg = _M_queue.front();
\r
84 /// Return the item at the head of the queue, without dequeuing
\r
87 /// Returns a pointer to the message found at the head of the queue.
\r
89 _Message * peek() const
\r
91 _Message * _Msg = NULL;
\r
93 if (!_M_queue.empty())
\r
95 _Msg = _M_queue.front();
\r
102 /// Returns the number of items currently in the queue.
\r
105 /// Size of the queue.
\r
107 size_t count() const
\r
109 return _M_queue.size();
\r
113 /// Checks to see if specified msg id is at the head of the queue.
\r
115 /// <param name="_MsgId">
\r
116 /// Message id to check for.
\r
119 /// True if a message with specified id is at the head, false otherwise.
\r
121 bool is_head(const runtime_object_identity _MsgId) const
\r
123 _Message * _Msg = peek();
\r
126 return _Msg->msg_id() == _MsgId;
\r
133 std::queue<_Message *> _M_queue;
\r
137 /// Simple queue implementation that takes into account priority
\r
138 /// using the comparison operator <.
\r
140 /// <typeparam name="_Type">
\r
141 /// The payload type of messages stored in this queue.
\r
143 template <class _Type>
\r
144 class PriorityQueue
\r
148 /// Constructs an initially empty queue.
\r
150 PriorityQueue() : _M_pHead(NULL), _M_count(0) {}
\r
153 /// Removes and deletes any messages remaining in the queue.
\r
157 message<_Type> * _Msg = dequeue();
\r
158 while (_Msg != NULL)
\r
166 /// Add an item to the queue, comparisons using the 'payload' field
\r
167 /// will determine the location in the queue.
\r
169 /// <param name="_Msg">
\r
170 /// Message to add.
\r
172 /// <param name="fCanReplaceHead">
\r
173 /// True if this new message can be inserted at the head.
\r
175 void enqueue(message<_Type> *_Msg, const bool fInsertAtHead = true)
\r
177 MessageNode *_Element = new MessageNode();
\r
178 _Element->_M_pMsg = _Msg;
\r
180 // Find location to insert.
\r
181 MessageNode *pCurrent = _M_pHead;
\r
182 MessageNode *pPrev = NULL;
\r
183 if(!fInsertAtHead && pCurrent != NULL)
\r
186 pCurrent = pCurrent->_M_pNext;
\r
188 while(pCurrent != NULL)
\r
190 if(_Element->_M_pMsg->payload < pCurrent->_M_pMsg->payload)
\r
195 pCurrent = pCurrent->_M_pNext;
\r
201 _M_pHead = _Element;
\r
205 pPrev->_M_pNext = _Element;
\r
208 // Last item in queue.
\r
209 if(pCurrent == NULL)
\r
211 _Element->_M_pNext = NULL;
\r
215 _Element->_M_pNext = pCurrent;
\r
222 /// Dequeue an item from the head of queue.
\r
225 /// Returns a pointer to the message found at the head of the queue.
\r
227 message<_Type> * dequeue()
\r
229 if (_M_pHead == NULL)
\r
234 MessageNode *_OldHead = _M_pHead;
\r
235 message<_Type> * _Result = _OldHead->_M_pMsg;
\r
237 _M_pHead = _OldHead->_M_pNext;
\r
241 if(--_M_count == 0)
\r
249 /// Return the item at the head of the queue, without dequeuing
\r
252 /// Returns a pointer to the message found at the head of the queue.
\r
254 message<_Type> * peek() const
\r
258 return _M_pHead->_M_pMsg;
\r
264 /// Returns the number of items currently in the queue.
\r
267 /// Size of the queue.
\r
269 size_t count() const
\r
275 /// Checks to see if specified msg id is at the head of the queue.
\r
277 /// <param name="_MsgId">
\r
278 /// Message id to check for.
\r
281 /// True if a message with specified id is at the head, false otherwise.
\r
283 bool is_head(const runtime_object_identity _MsgId) const
\r
287 return _M_pHead->_M_pMsg->msg_id() == _MsgId;
\r
294 // Used to store individual message nodes.
\r
297 MessageNode() : _M_pMsg(NULL), _M_pNext(NULL) {}
\r
298 message<_Type> * _M_pMsg;
\r
299 MessageNode * _M_pNext;
\r
302 // A pointer to the head of the queue.
\r
303 MessageNode * _M_pHead;
\r
305 // The number of elements presently stored in the queue.
\r
310 /// priority_buffer is a buffer that uses a comparison operator on the 'payload' of each message to determine
\r
311 /// order when offering to targets. Besides this it acts exactly like an unbounded_buffer.
\r
313 /// <typeparam name="_Type">
\r
314 /// The payload type of messages stored and propagated by the buffer.
\r
316 template<class _Type>
\r
317 class priority_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
\r
322 /// Creates an priority_buffer within the default scheduler, and places it any schedule
\r
323 /// group of the scheduler
\92s choosing.
\r
327 initialize_source_and_target();
\r
331 /// Creates an priority_buffer within the default scheduler, and places it any schedule
\r
332 /// group of the scheduler
\92s choosing.
\r
334 /// <param name="_Filter">
\r
335 /// A reference to a filter function.
\r
337 priority_buffer(filter_method const& _Filter)
\r
339 initialize_source_and_target();
\r
340 register_filter(_Filter);
\r
344 /// Creates an priority_buffer within the specified scheduler, and places it any schedule
\r
345 /// group of the scheduler
\92s choosing.
\r
347 /// <param name="_PScheduler">
\r
348 /// A reference to a scheduler instance.
\r
350 priority_buffer(Scheduler& _PScheduler)
\r
352 initialize_source_and_target(&_PScheduler);
\r
356 /// Creates an priority_buffer within the specified scheduler, and places it any schedule
\r
357 /// group of the scheduler
\92s choosing.
\r
359 /// <param name="_PScheduler">
\r
360 /// A reference to a scheduler instance.
\r
362 /// <param name="_Filter">
\r
363 /// A reference to a filter function.
\r
365 priority_buffer(Scheduler& _PScheduler, filter_method const& _Filter)
\r
367 initialize_source_and_target(&_PScheduler);
\r
368 register_filter(_Filter);
\r
372 /// Creates an priority_buffer within the specified schedule group. The scheduler is implied
\r
373 /// by the schedule group.
\r
375 /// <param name="_PScheduleGroup">
\r
376 /// A reference to a schedule group.
\r
378 priority_buffer(ScheduleGroup& _PScheduleGroup)
\r
380 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
384 /// Creates an priority_buffer within the specified schedule group. The scheduler is implied
\r
385 /// by the schedule group.
\r
387 /// <param name="_PScheduleGroup">
\r
388 /// A reference to a schedule group.
\r
390 /// <param name="_Filter">
\r
391 /// A reference to a filter function.
\r
393 priority_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)
\r
395 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
396 register_filter(_Filter);
\r
400 /// Cleans up any resources that may have been created by the priority_buffer.
\r
404 // Remove all links
\r
405 remove_network_links();
\r
409 /// Add an item to the priority_buffer
\r
411 /// <param name="_Item">
\r
412 /// A reference to the item to add.
\r
415 /// A boolean indicating whether the data was accepted.
\r
417 bool enqueue(_Type const& _Item)
\r
419 return Concurrency::send<_Type>(this, _Item);
\r
423 /// Remove an item from the priority_buffer
\r
426 /// The message payload.
\r
430 return receive<_Type>(this);
\r
436 /// The main propagate() function for ITarget blocks. Called by a source
\r
437 /// block, generally within an asynchronous task to send messages to its targets.
\r
439 /// <param name="_PMessage">
\r
440 /// A pointer to the message.
\r
442 /// <param name="_PSource">
\r
443 /// A pointer to the source block offering the message.
\r
446 /// An indication of what the target decided to do with the message.
\r
449 /// It is important that calls to propagate do *not* take the same lock on the
\r
450 /// internal structure that is used by Consume and the LWT. Doing so could
\r
451 /// result in a deadlock with the Consume call. (in the case of the priority_buffer,
\r
452 /// this lock is the m_internalLock)
\r
454 virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
456 message_status _Result = accepted;
\r
458 // Accept the message being propagated
\r
459 // Note: depending on the source block propagating the message
\r
460 // this may not necessarily be the same message (pMessage) first
\r
461 // passed into the function.
\r
463 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
465 if (_PMessage != NULL)
\r
467 async_send(_PMessage);
\r
478 /// Synchronously sends a message to this block. When this function completes the message will
\r
479 /// already have propagated into the block.
\r
481 /// <param name="_PMessage">
\r
482 /// A pointer to the message.
\r
484 /// <param name="_PSource">
\r
485 /// A pointer to the source block offering the message.
\r
488 /// An indication of what the target decided to do with the message.
\r
490 virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
492 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
494 if (_PMessage != NULL)
\r
496 sync_send(_PMessage);
\r
507 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
509 /// <param name="_MsgId">
\r
510 /// The runtime object identity of the message.
\r
513 /// A pointer to the message that the caller now has ownership of.
\r
515 virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
\r
518 // Peek at the head message in the message buffer. If the Ids match
\r
519 // dequeue and transfer ownership
\r
521 message<_Type> * _Msg = NULL;
\r
523 if (_M_messageBuffer.is_head(_MsgId))
\r
525 _Msg = _M_messageBuffer.dequeue();
\r
532 /// Reserves a message previously offered by the source.
\r
534 /// <param name="_MsgId">
\r
535 /// The runtime object identity of the message.
\r
538 /// A Boolean indicating whether the reservation worked or not.
\r
541 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
543 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
545 // Allow reservation if this is the head message
\r
546 return _M_messageBuffer.is_head(_MsgId);
\r
550 /// Consumes a message that was reserved previously.
\r
552 /// <param name="_MsgId">
\r
553 /// The runtime object identity of the message.
\r
556 /// A pointer to the message that the caller now has ownership of.
\r
559 /// Similar to 'accept', but is always preceded by a call to 'reserve'.
\r
561 virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
\r
563 // By default, accept the message
\r
564 return accept_message(_MsgId);
\r
568 /// Releases a previous message reservation.
\r
570 /// <param name="_MsgId">
\r
571 /// The runtime object identity of the message.
\r
573 virtual void release_message(runtime_object_identity _MsgId)
\r
575 // The head message is the one reserved.
\r
576 if (!_M_messageBuffer.is_head(_MsgId))
\r
578 throw message_not_found();
\r
583 /// Resumes propagation after a reservation has been released
\r
585 virtual void resume_propagation()
\r
587 // If there are any messages in the buffer, propagate them out
\r
588 if (_M_messageBuffer.count() > 0)
\r
595 /// Notification that a target was linked to this source.
\r
597 /// <param name="_PTarget">
\r
598 /// A pointer to the newly linked target.
\r
600 virtual void link_target_notification(ITarget<_Type> * _PTarget)
\r
602 // If the message queue is blocked due to reservation
\r
603 // there is no need to do any message propagation
\r
604 if (_M_pReservedFor != NULL)
\r
609 message<_Type> * _Msg = _M_messageBuffer.peek();
\r
613 // Propagate the head message to the new target
\r
614 message_status _Status = _PTarget->propagate(_Msg, this);
\r
616 if (_Status == accepted)
\r
618 // The target accepted the message, restart propagation.
\r
619 propagate_to_any_targets(NULL);
\r
622 // If the status is anything other than accepted, then leave
\r
623 // the message queue blocked.
\r
628 /// Takes the message and propagates it to all the targets of this priority_buffer.
\r
630 /// <param name="_PMessage">
\r
631 /// A pointer to a new message.
\r
633 virtual void propagate_to_any_targets(message<_Type> * _PMessage)
\r
635 // Enqueue pMessage to the internal unbounded buffer queue if it is non-NULL.
\r
636 // _PMessage can be NULL if this LWT was the result of a Repropagate call
\r
637 // out of a Consume or Release (where no new message is queued up, but
\r
638 // everything remaining in the priority_buffer needs to be propagated out)
\r
639 if (_PMessage != NULL)
\r
641 message<_Type> *pPrevHead = _M_messageBuffer.peek();
\r
643 // If a reservation is held make sure to not insert this new
\r
644 // message before it.
\r
645 if(_M_pReservedFor != NULL)
\r
647 _M_messageBuffer.enqueue(_PMessage, false);
\r
651 _M_messageBuffer.enqueue(_PMessage);
\r
654 // If the head message didn't change, we can safely assume that
\r
655 // the head message is blocked and waiting on Consume(), Release() or a new
\r
657 if (pPrevHead != NULL && !_M_messageBuffer.is_head(pPrevHead->msg_id()))
\r
663 // Attempt to propagate messages to all the targets
\r
664 _Propagate_priority_order();
\r
670 /// Attempts to propagate out any messages currently in the block.
\r
672 void _Propagate_priority_order()
\r
674 message<_Target_type> * _Msg = _M_messageBuffer.peek();
\r
676 // If someone has reserved the _Head message, don't propagate anymore
\r
677 if (_M_pReservedFor != NULL)
\r
682 while (_Msg != NULL)
\r
684 message_status _Status = declined;
\r
686 // Always start from the first target that linked.
\r
687 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
689 ITarget<_Target_type> * _PTarget = *_Iter;
\r
690 _Status = _PTarget->propagate(_Msg, this);
\r
692 // Ownership of message changed. Do not propagate this
\r
693 // message to any other target.
\r
694 if (_Status == accepted)
\r
699 // If the target just propagated to reserved this message, stop
\r
700 // propagating it to others.
\r
701 if (_M_pReservedFor != NULL)
\r
707 // If status is anything other than accepted, then the head message
\r
708 // was not propagated out. Thus, nothing after it in the queue can
\r
709 // be propagated out. Cease propagation.
\r
710 if (_Status != accepted)
\r
715 // Get the next message
\r
716 _Msg = _M_messageBuffer.peek();
\r
721 /// Priority Queue used to store messages.
\r
723 PriorityQueue<_Type> _M_messageBuffer;
\r
726 // Hide assignment operator and copy constructor.
\r
728 priority_buffer const &operator =(priority_buffer const&); // no assignment operator
\r
729 priority_buffer(priority_buffer const &); // no copy constructor
\r
734 /// A bounded_buffer implementation. Once the capacity is reached it will save the offered message
\r
735 /// id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume
\r
736 /// any of the postponed messages. Preference is given to previously offered messages before new ones.
\r
738 /// NOTE: this bounded_buffer implementation contains code that is very unique to this particular block.
\r
739 /// Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer
\r
740 /// implementation uses a critical_section, several interlocked operations, and additional calls to async_send.
\r
741 /// These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem.
\r
743 /// <typeparam name="_Type">
\r
744 /// The payload type of messages stored and propagated by the buffer.
\r
746 template<class _Type>
\r
747 class bounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
\r
751 /// Creates an bounded_buffer within the default scheduler, and places it any schedule
\r
752 /// group of the scheduler
\92s choosing.
\r
754 bounded_buffer(const size_t capacity)
\r
755 : _M_capacity(capacity), _M_currentSize(0)
\r
757 initialize_source_and_target();
\r
761 /// Creates an bounded_buffer within the default scheduler, and places it any schedule
\r
762 /// group of the scheduler
\92s choosing.
\r
764 /// <param name="_Filter">
\r
765 /// A reference to a filter function.
\r
767 bounded_buffer(const size_t capacity, filter_method const& _Filter)
\r
768 : _M_capacity(capacity), _M_currentSize(0)
\r
770 initialize_source_and_target();
\r
771 register_filter(_Filter);
\r
775 /// Creates an bounded_buffer within the specified scheduler, and places it any schedule
\r
776 /// group of the scheduler
\92s choosing.
\r
778 /// <param name="_PScheduler">
\r
779 /// A reference to a scheduler instance.
\r
781 bounded_buffer(const size_t capacity, Scheduler& _PScheduler)
\r
782 : _M_capacity(capacity), _M_currentSize(0)
\r
784 initialize_source_and_target(&_PScheduler);
\r
788 /// Creates an bounded_buffer within the specified scheduler, and places it any schedule
\r
789 /// group of the scheduler
\92s choosing.
\r
791 /// <param name="_PScheduler">
\r
792 /// A reference to a scheduler instance.
\r
794 /// <param name="_Filter">
\r
795 /// A reference to a filter function.
\r
797 bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter)
\r
798 : _M_capacity(capacity), _M_currentSize(0)
\r
800 initialize_source_and_target(&_PScheduler);
\r
801 register_filter(_Filter);
\r
805 /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied
\r
806 /// by the schedule group.
\r
808 /// <param name="_PScheduleGroup">
\r
809 /// A reference to a schedule group.
\r
811 bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup)
\r
812 : _M_capacity(capacity), _M_currentSize(0)
\r
814 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
818 /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied
\r
819 /// by the schedule group.
\r
821 /// <param name="_PScheduleGroup">
\r
822 /// A reference to a schedule group.
\r
824 /// <param name="_Filter">
\r
825 /// A reference to a filter function.
\r
827 bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)
\r
828 : _M_capacity(capacity), _M_currentSize(0)
\r
830 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
831 register_filter(_Filter);
\r
835 /// Cleans up any resources that may have been used by the bounded_buffer.
\r
839 // Remove all links
\r
840 remove_network_links();
\r
844 /// Add an item to the bounded_buffer.
\r
846 /// <param name="_Item">
\r
847 /// A reference to the item to add.
\r
850 /// A boolean indicating whether the data was accepted.
\r
852 bool enqueue(_Type const& _Item)
\r
854 return Concurrency::send<_Type>(this, _Item);
\r
858 /// Remove an item from the bounded_buffer.
\r
861 /// The message payload.
\r
865 return receive<_Type>(this);
\r
871 /// The main propagate() function for ITarget blocks. Called by a source
\r
872 /// block, generally within an asynchronous task to send messages to its targets.
\r
874 /// <param name="_PMessage">
\r
875 /// A pointer to the message.
\r
877 /// <param name="_PSource">
\r
878 /// A pointer to the source block offering the message.
\r
881 /// An indication of what the target decided to do with the message.
\r
884 /// It is important that calls to propagate do *not* take the same lock on the
\r
885 /// internal structure that is used by Consume and the LWT. Doing so could
\r
886 /// result in a deadlock with the Consume call. (in the case of the bounded_buffer,
\r
887 /// this lock is the m_internalLock)
\r
889 virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
891 message_status _Result = accepted;
\r
893 // Check current capacity.
\r
894 if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
\r
896 // Postpone the message, buffer is full.
\r
897 _InterlockedDecrement(&_M_currentSize);
\r
898 _Result = postponed;
\r
900 // Save off the message id from this source to later try
\r
901 // and reserve/consume when more space is free.
\r
903 critical_section::scoped_lock scopedLock(_M_savedIdsLock);
\r
904 _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();
\r
912 // Accept the message being propagated
\r
913 // Note: depending on the source block propagating the message
\r
914 // this may not necessarily be the same message (pMessage) first
\r
915 // passed into the function.
\r
917 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
919 if (_PMessage != NULL)
\r
921 async_send(_PMessage);
\r
925 // Didn't get a message so need to decrement.
\r
926 _InterlockedDecrement(&_M_currentSize);
\r
936 /// Synchronously sends a message to this block. When this function completes the message will
\r
937 /// already have propagated into the block.
\r
939 /// <param name="_PMessage">
\r
940 /// A pointer to the message.
\r
942 /// <param name="_PSource">
\r
943 /// A pointer to the source block offering the message.
\r
946 /// An indication of what the target decided to do with the message.
\r
948 virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
950 message_status _Result = accepted;
\r
952 // Check current capacity.
\r
953 if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
\r
955 // Postpone the message, buffer is full.
\r
956 _InterlockedDecrement(&_M_currentSize);
\r
957 _Result = postponed;
\r
959 // Save off the message id from this source to later try
\r
960 // and reserve/consume when more space is free.
\r
962 critical_section::scoped_lock scopedLock(_M_savedIdsLock);
\r
963 _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();
\r
971 // Accept the message being propagated
\r
972 // Note: depending on the source block propagating the message
\r
973 // this may not necessarily be the same message (pMessage) first
\r
974 // passed into the function.
\r
976 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
978 if (_PMessage != NULL)
\r
980 async_send(_PMessage);
\r
984 // Didn't get a message so need to decrement.
\r
985 _InterlockedDecrement(&_M_currentSize);
\r
995 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
997 /// <param name="_MsgId">
\r
998 /// The runtime object identity of the message.
\r
1001 /// A pointer to the message that the caller now has ownership of.
\r
1003 virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
\r
1006 // Peek at the head message in the message buffer. If the Ids match
\r
1007 // dequeue and transfer ownership
\r
1009 message<_Type> * _Msg = NULL;
\r
1011 if (_M_messageBuffer.is_head(_MsgId))
\r
1013 _Msg = _M_messageBuffer.dequeue();
\r
1015 // Give preference to any previously postponed messages
\r
1016 // before decrementing current size.
\r
1017 if(!try_consume_msg())
\r
1019 _InterlockedDecrement(&_M_currentSize);
\r
1027 /// Try to reserve and consume a message from list of saved message ids.
\r
1030 /// True if a message was sucessfully consumed, false otherwise.
\r
1032 bool try_consume_msg()
\r
1034 runtime_object_identity _ReservedId = -1;
\r
1035 ISource<_Type> * _PSource = NULL;
\r
1037 // Walk through source links seeing if any saved ids exist.
\r
1038 bool _ConsumedMsg = true;
\r
1039 while(_ConsumedMsg)
\r
1041 source_iterator _Iter = _M_connectedSources.begin();
\r
1043 critical_section::scoped_lock scopedLock(_M_savedIdsLock);
\r
1044 for (; *_Iter != NULL; ++_Iter)
\r
1046 _PSource = *_Iter;
\r
1047 std::map<ISource<_Type> *, runtime_object_identity>::iterator _MapIter;
\r
1048 if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end())
\r
1050 _ReservedId = _MapIter->second;
\r
1051 _M_savedSourceMsgIds.erase(_MapIter);
\r
1057 // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster.
\r
1058 if(_ReservedId != -1)
\r
1060 if(_PSource->reserve(_ReservedId, this))
\r
1062 message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this);
\r
1063 async_send(_ConsumedMsg);
\r
1066 // Reserve failed go or link was removed,
\r
1067 // go back and try and find a different msg id.
\r
1074 // If this point is reached the map of source ids was empty.
\r
1082 /// Reserves a message previously offered by the source.
\r
1084 /// <param name="_MsgId">
\r
1085 /// The runtime object identity of the message.
\r
1088 /// A Boolean indicating whether the reservation worked or not.
\r
1091 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
1093 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
1095 // Allow reservation if this is the head message
\r
1096 return _M_messageBuffer.is_head(_MsgId);
\r
1100 /// Consumes a message that was reserved previously.
\r
1102 /// <param name="_MsgId">
\r
1103 /// The runtime object identity of the message.
\r
1106 /// A pointer to the message that the caller now has ownership of.
\r
1109 /// Similar to 'accept', but is always preceded by a call to 'reserve'.
\r
1111 virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
\r
1113 // By default, accept the message
\r
1114 return accept_message(_MsgId);
\r
1118 /// Releases a previous message reservation.
\r
1120 /// <param name="_MsgId">
\r
1121 /// The runtime object identity of the message.
\r
1123 virtual void release_message(runtime_object_identity _MsgId)
\r
1125 // The head message is the one reserved.
\r
1126 if (!_M_messageBuffer.is_head(_MsgId))
\r
1128 throw message_not_found();
\r
1133 /// Resumes propagation after a reservation has been released
\r
1135 virtual void resume_propagation()
\r
1137 // If there are any messages in the buffer, propagate them out
\r
1138 if (_M_messageBuffer.count() > 0)
\r
1145 /// Notification that a target was linked to this source.
\r
1147 /// <param name="_PTarget">
\r
1148 /// A pointer to the newly linked target.
\r
1150 virtual void link_target_notification(ITarget<_Type> * _PTarget)
\r
1152 // If the message queue is blocked due to reservation
\r
1153 // there is no need to do any message propagation
\r
1154 if (_M_pReservedFor != NULL)
\r
1159 message<_Type> * _Msg = _M_messageBuffer.peek();
\r
1163 // Propagate the head message to the new target
\r
1164 message_status _Status = _PTarget->propagate(_Msg, this);
\r
1166 if (_Status == accepted)
\r
1168 // The target accepted the message, restart propagation.
\r
1169 propagate_to_any_targets(NULL);
\r
1172 // If the status is anything other than accepted, then leave
\r
1173 // the message queue blocked.
\r
1178 /// Takes the message and propagates it to all the targets of this bounded_buffer.
\r
1179 /// This is called from async_send.
\r
1181 /// <param name="_PMessage">
\r
1182 /// A pointer to a new message.
\r
1184 virtual void propagate_to_any_targets(message<_Type> * _PMessage)
\r
1186 // Enqueue pMessage to the internal message buffer if it is non-NULL.
\r
1187 // pMessage can be NULL if this LWT was the result of a Repropagate call
\r
1188 // out of a Consume or Release (where no new message is queued up, but
\r
1189 // everything remaining in the bounded buffer needs to be propagated out)
\r
1190 if (_PMessage != NULL)
\r
1192 _M_messageBuffer.enqueue(_PMessage);
\r
1194 // If the incoming pMessage is not the head message, we can safely assume that
\r
1195 // the head message is blocked and waiting on Consume(), Release() or a new
\r
1196 // link_target() and cannot be propagated out.
\r
1197 if (!_M_messageBuffer.is_head(_PMessage->msg_id()))
\r
1203 _Propagate_priority_order();
\r
1207 // While current size is less than capacity try to consume
\r
1208 // any previously offered ids.
\r
1209 bool _ConsumedMsg = true;
\r
1210 while(_ConsumedMsg)
\r
1212 // Assume a message will be found to successfully consume in the
\r
1213 // saved ids, if not this will be decremented afterwards.
\r
1214 if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
\r
1219 _ConsumedMsg = try_consume_msg();
\r
1222 // Decrement the current size, we broke out of the previous loop
\r
1223 // because we reached capacity or there were no more messages to consume.
\r
1224 _InterlockedDecrement(&_M_currentSize);
\r
1231 /// Attempts to propagate out any messages currently in the block.
\r
1233 void _Propagate_priority_order()
\r
1235 message<_Target_type> * _Msg = _M_messageBuffer.peek();
\r
1237 // If someone has reserved the _Head message, don't propagate anymore
\r
1238 if (_M_pReservedFor != NULL)
\r
1243 while (_Msg != NULL)
\r
1245 message_status _Status = declined;
\r
1247 // Always start from the first target that linked.
\r
1248 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
1250 ITarget<_Target_type> * _PTarget = *_Iter;
\r
1251 _Status = _PTarget->propagate(_Msg, this);
\r
1253 // Ownership of message changed. Do not propagate this
\r
1254 // message to any other target.
\r
1255 if (_Status == accepted)
\r
1260 // If the target just propagated to reserved this message, stop
\r
1261 // propagating it to others.
\r
1262 if (_M_pReservedFor != NULL)
\r
1268 // If status is anything other than accepted, then the head message
\r
1269 // was not propagated out. Thus, nothing after it in the queue can
\r
1270 // be propagated out. Cease propagation.
\r
1271 if (_Status != accepted)
\r
1276 // Get the next message
\r
1277 _Msg = _M_messageBuffer.peek();
\r
1282 /// Message buffer used to store messages.
\r
1284 MessageQueue<_Type> _M_messageBuffer;
\r
1287 /// Maximum number of messages bounded_buffer can hold.
\r
1289 const size_t _M_capacity;
\r
1292 /// Current number of messages in bounded_buffer.
\r
1294 volatile long _M_currentSize;
\r
1297 /// Lock used to guard saved message ids map.
\r
1299 critical_section _M_savedIdsLock;
\r
1302 /// Map of source links to saved message ids.
\r
1304 std::map<ISource<_Type> *, runtime_object_identity> _M_savedSourceMsgIds;
\r
1307 // Hide assignment operator and copy constructor
\r
1309 bounded_buffer const &operator =(bounded_buffer const&); // no assignment operator
\r
1310 bounded_buffer(bounded_buffer const &); // no copy constructor
\r
1314 /// A simple alternator, offers messages in order to each target
\r
1315 /// one at a time. If a consume occurs a message won't be offered to that target again
\r
1316 /// until all others are given a chance. This causes messages to be distributed more
\r
1317 /// evenly among targets.
\r
1319 /// <typeparam name="_Type">
\r
1320 /// The payload type of messages stored and propagated by the buffer.
\r
1322 template<class _Type>
\r
1323 class alternator : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
\r
1327 /// Create an alternator within the default scheduler, and places it any schedule
\r
1328 /// group of the scheduler
\92s choosing.
\r
1331 : _M_indexNextTarget(0)
\r
1333 initialize_source_and_target();
\r
1337 /// Creates an alternator within the default scheduler, and places it any schedule
\r
1338 /// group of the scheduler
\92s choosing.
\r
1340 /// <param name="_Filter">
\r
1341 /// A reference to a filter function.
\r
1343 alternator(filter_method const& _Filter)
\r
1344 : _M_indexNextTarget(0)
\r
1346 initialize_source_and_target();
\r
1347 register_filter(_Filter);
\r
1351 /// Creates an alternator within the specified scheduler, and places it any schedule
\r
1352 /// group of the scheduler
\92s choosing.
\r
1354 /// <param name="_PScheduler">
\r
1355 /// A reference to a scheduler instance.
\r
1357 alternator(Scheduler& _PScheduler)
\r
1358 : _M_indexNextTarget(0)
\r
1360 initialize_source_and_target(&_PScheduler);
\r
1364 /// Creates an alternator within the specified scheduler, and places it any schedule
\r
1365 /// group of the scheduler
\92s choosing.
\r
1367 /// <param name="_PScheduler">
\r
1368 /// A reference to a scheduler instance.
\r
1370 /// <param name="_Filter">
\r
1371 /// A reference to a filter function.
\r
1373 alternator(Scheduler& _PScheduler, filter_method const& _Filter)
\r
1374 : _M_indexNextTarget(0)
\r
1376 initialize_source_and_target(&_PScheduler);
\r
1377 register_filter(_Filter);
\r
1381 /// Creates an alternator within the specified schedule group. The scheduler is implied
\r
1382 /// by the schedule group.
\r
1384 /// <param name="_PScheduleGroup">
\r
1385 /// A reference to a schedule group.
\r
1387 alternator(ScheduleGroup& _PScheduleGroup)
\r
1388 : _M_indexNextTarget(0)
\r
1390 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
1394 /// Creates an alternator within the specified schedule group. The scheduler is implied
\r
1395 /// by the schedule group.
\r
1397 /// <param name="_PScheduleGroup">
\r
1398 /// A reference to a schedule group.
\r
1400 /// <param name="_Filter">
\r
1401 /// A reference to a filter function.
\r
1403 alternator(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)
\r
1404 : _M_indexNextTarget(0)
\r
1406 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
1407 register_filter(_Filter);
\r
1411 /// Cleans up any resources that may have been created by the alternator.
\r
1415 // Remove all links
\r
1416 remove_network_links();
\r
1422 /// The main propagate() function for ITarget blocks. Called by a source
\r
1423 /// block, generally within an asynchronous task to send messages to its targets.
\r
1425 /// <param name="_PMessage">
\r
1426 /// A pointer to the message
\r
1428 /// <param name="_PSource">
\r
1429 /// A pointer to the source block offering the message.
\r
1432 /// An indication of what the target decided to do with the message.
\r
1435 /// It is important that calls to propagate do *not* take the same lock on the
\r
1436 /// internal structure that is used by Consume and the LWT. Doing so could
\r
1437 /// result in a deadlock with the Consume call. (in the case of the alternator,
\r
1438 /// this lock is the m_internalLock)
\r
1440 virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
1442 message_status _Result = accepted;
\r
1444 // Accept the message being propagated
\r
1445 // Note: depending on the source block propagating the message
\r
1446 // this may not necessarily be the same message (pMessage) first
\r
1447 // passed into the function.
\r
1449 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
1451 if (_PMessage != NULL)
\r
1453 async_send(_PMessage);
\r
1464 /// Synchronously sends a message to this block. When this function completes the message will
\r
1465 /// already have propagated into the block.
\r
1467 /// <param name="_PMessage">
\r
1468 /// A pointer to the message.
\r
1470 /// <param name="_PSource">
\r
1471 /// A pointer to the source block offering the message.
\r
1474 /// An indication of what the target decided to do with the message.
\r
1476 virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
1478 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
1480 if (_PMessage != NULL)
\r
1482 sync_send(_PMessage);
\r
1493 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
1495 /// <param name="_MsgId">
\r
1496 /// The runtime object identity of the message.
\r
1499 /// A pointer to the message that the caller now has ownership of.
\r
1501 virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
\r
1504 // Peek at the head message in the message buffer. If the Ids match
\r
1505 // dequeue and transfer ownership
\r
1507 message<_Type> * _Msg = NULL;
\r
1509 if (_M_messageBuffer.is_head(_MsgId))
\r
1511 _Msg = _M_messageBuffer.dequeue();
\r
1518 /// Reserves a message previously offered by the source.
\r
1520 /// <param name="_MsgId">
\r
1521 /// The runtime object identity of the message.
\r
1524 /// A Boolean indicating whether the reservation worked or not.
\r
1527 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
1529 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
1531 // Allow reservation if this is the head message
\r
1532 return _M_messageBuffer.is_head(_MsgId);
\r
1536 /// Consumes a message that was reserved previously.
\r
1538 /// <param name="_MsgId">
\r
1539 /// The runtime object identity of the message.
\r
1542 /// A pointer to the message that the caller now has ownership of.
\r
1545 /// Similar to 'accept', but is always preceded by a call to 'reserve'.
\r
1547 virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
\r
1549 // Update so we don't offer to this target again until
\r
1550 // all others have a chance.
\r
1551 target_iterator _CurrentIter = _M_connectedTargets.begin();
\r
1552 for(size_t i = 0;*_CurrentIter != NULL; ++_CurrentIter, ++i)
\r
1554 if(*_CurrentIter == _M_pReservedFor)
\r
1556 _M_indexNextTarget = i + 1;
\r
1561 // By default, accept the message
\r
1562 return accept_message(_MsgId);
\r
1566 /// Releases a previous message reservation.
\r
1568 /// <param name="_MsgId">
\r
1569 /// The runtime object identity of the message.
\r
1571 virtual void release_message(runtime_object_identity _MsgId)
\r
1573 // The head message is the one reserved.
\r
1574 if (!_M_messageBuffer.is_head(_MsgId))
\r
1576 throw message_not_found();
\r
1581 /// Resumes propagation after a reservation has been released.
\r
1583 virtual void resume_propagation()
\r
1585 // If there are any messages in the buffer, propagate them out
\r
1586 if (_M_messageBuffer.count() > 0)
\r
1593 /// Notification that a target was linked to this source.
\r
1595 /// <param name="_PTarget">
\r
1596 /// A pointer to the newly linked target.
\r
1598 virtual void link_target_notification(ITarget<_Type> * _PTarget)
\r
1600 // If the message queue is blocked due to reservation
\r
1601 // there is no need to do any message propagation
\r
1602 if (_M_pReservedFor != NULL)
\r
1607 message<_Type> * _Msg = _M_messageBuffer.peek();
\r
1611 // Propagate the head message to the new target
\r
1612 message_status _Status = _PTarget->propagate(_Msg, this);
\r
1614 if (_Status == accepted)
\r
1616 // The target accepted the message, restart propagation.
\r
1617 propagate_to_any_targets(NULL);
\r
1620 // If the status is anything other than accepted, then leave
\r
1621 // the message queue blocked.
\r
1626 /// Takes the message and propagates it to all the targets of this alternator.
\r
1627 /// This is called from async_send.
\r
1629 /// <param name="_PMessage">
\r
1630 /// A pointer to a new message.
\r
1632 virtual void propagate_to_any_targets(message<_Type> * _PMessage)
\r
1634 // Enqueue pMessage to the internal buffer queue if it is non-NULL.
\r
1635 // pMessage can be NULL if this LWT was the result of a Repropagate call
\r
1636 // out of a Consume or Release (where no new message is queued up, but
\r
1637 // everything remaining in the unbounded buffer needs to be propagated out)
\r
1638 if (_PMessage != NULL)
\r
1640 _M_messageBuffer.enqueue(_PMessage);
\r
1642 // If the incoming pMessage is not the head message, we can safely assume that
\r
1643 // the head message is blocked and waiting on Consume(), Release() or a new
\r
1645 if (!_M_messageBuffer.is_head(_PMessage->msg_id()))
\r
1651 // Attempt to propagate messages to targets in order last left off.
\r
1652 _Propagate_alternating_order();
\r
1656 /// Offers messages to targets in alternating order to help distribute messages
\r
1657 /// evenly among targets.
\r
1659 void _Propagate_alternating_order()
\r
1661 message<_Target_type> * _Msg = _M_messageBuffer.peek();
\r
1663 // If someone has reserved the _Head message, don't propagate anymore
\r
1664 if (_M_pReservedFor != NULL)
\r
1670 // Try to start where left off before, if the link has been removed
\r
1671 // or this is the first time then start at the beginning.
\r
1673 target_iterator _CurrentIter = _M_connectedTargets.begin();
\r
1674 const target_iterator _FirstLinkIter(_CurrentIter);
\r
1675 for(size_t i = 0;*_CurrentIter != NULL && i < _M_indexNextTarget; ++_CurrentIter, ++i) {}
\r
1677 while (_Msg != NULL)
\r
1679 message_status _Status = declined;
\r
1681 // Loop offering message until end of links is reached.
\r
1682 target_iterator _StartedIter(_CurrentIter);
\r
1683 for(;*_CurrentIter != NULL; ++_CurrentIter)
\r
1685 _Status = (*_CurrentIter)->propagate(_Msg, this);
\r
1686 ++_M_indexNextTarget;
\r
1688 // Ownership of message changed. Do not propagate this
\r
1689 // message to any other target.
\r
1690 if (_Status == accepted)
\r
1696 // If the target just propagated to reserved this message, stop
\r
1697 // propagating it to others
\r
1698 if (_M_pReservedFor != NULL)
\r
1704 // Message ownership changed go to next messages.
\r
1705 if (_Status == accepted)
\r
1710 // Try starting from the beginning until the first link offering was started at.
\r
1711 _M_indexNextTarget = 0;
\r
1712 for(_CurrentIter = _FirstLinkIter;*_CurrentIter != NULL; ++_CurrentIter)
\r
1714 // I have offered the same message to all links now so stop.
\r
1715 if(*_CurrentIter == *_StartedIter)
\r
1720 _Status = (*_CurrentIter)->propagate(_Msg, this);
\r
1721 ++_M_indexNextTarget;
\r
1723 // Ownership of message changed. Do not propagate this
\r
1724 // message to any other target.
\r
1725 if (_Status == accepted)
\r
1731 // If the target just propagated to reserved this message, stop
\r
1732 // propagating it to others
\r
1733 if (_M_pReservedFor != NULL)
\r
1739 // If status is anything other than accepted, then the head message
\r
1740 // was not propagated out. Thus, nothing after it in the queue can
\r
1741 // be propagated out. Cease propagation.
\r
1742 if (_Status != accepted)
\r
1747 // Get the next message
\r
1748 _Msg = _M_messageBuffer.peek();
\r
1755 /// Message queue used to store messages.
\r
1757 MessageQueue<_Type> _M_messageBuffer;
\r
1760 /// Index of next target to call propagate on. Used to alternate and load
\r
1761 /// balance message offering.
\r
1763 size_t _M_indexNextTarget;
\r
1766 // Hide assignment operator and copy constructor.
\r
1768 alternator const &operator =(alternator const&); // no assignment operator
\r
1769 alternator(alternator const &); // no copy constructor
\r
1772 #include <agents.h>
\r
1775 // Sample block that combines join and transform.
\r
1777 template<class _Input, class _Output, join_type _Jtype = non_greedy>
\r
1778 class join_transform : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>
\r
1782 typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;
\r
1785 /// Create an join block within the default scheduler, and places it any schedule
\r
1786 /// group of the scheduler
\92s choosing.
\r
1788 /// <param name="_NumInputs">
\r
1789 /// The number of inputs this join will be allowed
\r
1791 join_transform(size_t _NumInputs, _Transform_method const& _Func)
\r
1792 : _M_messageArray(_NumInputs, 0),
\r
1793 _M_savedMessageIdArray(_NumInputs, -1),
\r
1796 _Initialize(_NumInputs);
\r
1800 /// Create an join block within the default scheduler, and places it any schedule
\r
1801 /// group of the scheduler
\92s choosing.
\r
1803 /// <param name="_NumInputs">
\r
1804 /// The number of inputs this join will be allowed
\r
1806 /// <param name="_Filter">
\r
1807 /// A filter method placed on this join
\r
1809 join_transform(size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)
\r
1810 : _M_messageArray(_NumInputs, 0),
\r
1811 _M_savedMessageIdArray(_NumInputs, -1),
\r
1814 _Initialize(_NumInputs);
\r
1815 register_filter(_Filter);
\r
1819 /// Create an join block within the specified scheduler, and places it any schedule
\r
1820 /// group of the scheduler
\92s choosing.
\r
1822 /// <param name="_Scheduler">
\r
1823 /// The scheduler onto which the task's message propagation will be scheduled.
\r
1825 /// <param name="_NumInputs">
\r
1826 /// The number of inputs this join will be allowed
\r
1828 join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func)
\r
1829 : _M_messageArray(_NumInputs, 0),
\r
1830 _M_savedMessageIdArray(_NumInputs, -1),
\r
1833 _Initialize(_NumInputs, &_PScheduler);
\r
1837 /// Create an join block within the specified scheduler, and places it any schedule
\r
1838 /// group of the scheduler
\92s choosing.
\r
1840 /// <param name="_Scheduler">
\r
1841 /// The scheduler onto which the task's message propagation will be scheduled.
\r
1843 /// <param name="_NumInputs">
\r
1844 /// The number of inputs this join will be allowed
\r
1846 /// <param name="_Filter">
\r
1847 /// A filter method placed on this join
\r
1849 join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)
\r
1850 : _M_messageArray(_NumInputs, 0),
\r
1851 _M_savedMessageIdArray(_NumInputs, -1),
\r
1854 _Initialize(_NumInputs, &_PScheduler);
\r
1855 register_filter(_Filter);
\r
1859 /// Create an join block within the specified schedule group. The scheduler is implied
\r
1860 /// by the schedule group.
\r
1862 /// <param name="_PScheduleGroup">
\r
1863 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
1865 /// <param name="_NumInputs">
\r
1866 /// The number of inputs this join will be allowed
\r
1868 join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func)
\r
1869 : _M_messageArray(_NumInputs, 0),
\r
1870 _M_savedMessageIdArray(_NumInputs, -1),
\r
1873 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
1877 /// Create an join block within the specified schedule group. The scheduler is implied
\r
1878 /// by the schedule group.
\r
1880 /// <param name="_PScheduleGroup">
\r
1881 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
1883 /// <param name="_NumInputs">
\r
1884 /// The number of inputs this join will be allowed
\r
1886 /// <param name="_Filter">
\r
1887 /// A filter method placed on this join
\r
1889 join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)
\r
1890 : _M_messageArray(_NumInputs, 0),
\r
1891 _M_savedMessageIdArray(_NumInputs, -1),
\r
1894 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
1895 register_filter(_Filter);
\r
1899 /// Destroys a join
\r
1903 // Remove all links that are targets of this join
\r
1904 remove_network_links();
\r
1906 delete [] _M_savedIdBuffer;
\r
1911 // propagator_block protected function implementations
\r
1915 /// The main propagate() function for ITarget blocks. Called by a source
\r
1916 /// block, generally within an asynchronous task to send messages to its targets.
\r
1918 /// <param name="_PMessage">
\r
1919 /// The message being propagated
\r
1921 /// <param name="_PSource">
\r
1922 /// The source doing the propagation
\r
1925 /// An indication of what the target decided to do with the message.
\r
1928 /// It is important that calls to propagate do *not* take the same lock on the
\r
1929 /// internal structure that is used by Consume and the LWT. Doing so could
\r
1930 /// result in a deadlock with the Consume call.
\r
1932 message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource)
\r
1934 message_status _Ret_val = accepted;
\r
1937 // Find the slot index of this source
\r
1940 bool _Found = false;
\r
1941 for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
\r
1943 if (*_Iter == _PSource)
\r
1954 // If this source was not found in the array, this is not a connected source
\r
1955 // decline the message
\r
1959 _ASSERTE(_Slot < _M_messageArray.size());
\r
1961 bool fIsGreedy = (_Jtype == greedy);
\r
1966 // Greedy type joins immediately accept the message.
\r
1969 critical_section::scoped_lock lockHolder(_M_propagationLock);
\r
1970 if (_M_messageArray[_Slot] != NULL)
\r
1972 _M_savedMessageIdArray[_Slot] = _PMessage->msg_id();
\r
1973 _Ret_val = postponed;
\r
1977 if (_Ret_val != postponed)
\r
1979 _M_messageArray[_Slot] = _PSource->accept(_PMessage->msg_id(), this);
\r
1981 if (_M_messageArray[_Slot] != NULL)
\r
1983 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)
\r
1985 // If messages have arrived on all links, start a propagation
\r
1986 // of the current message
\r
1992 _Ret_val = missed;
\r
1999 // Non-greedy type joins save the message ids until they have all arrived
\r
2002 if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)
\r
2004 // Decrement the message remaining count if this thread is switching
\r
2005 // the saved id from -1 to a valid value.
\r
2006 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)
\r
2012 // Always return postponed. This message will be consumed
\r
2014 _Ret_val = postponed;
\r
2021 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
2023 /// <param name="_MsgId">
\r
2024 /// The runtime object identity of the message.
\r
2027 /// A pointer to the message that the caller now has ownership of.
\r
2029 virtual message<_Output> * accept_message(runtime_object_identity _MsgId)
\r
2032 // Peek at the head message in the message buffer. If the Ids match
\r
2033 // dequeue and transfer ownership
\r
2035 message<_Output> * _Msg = NULL;
\r
2037 if (_M_messageBuffer.is_head(_MsgId))
\r
2039 _Msg = _M_messageBuffer.dequeue();
\r
2046 /// Reserves a message previously offered by the source.
\r
2048 /// <param name="_MsgId">
\r
2049 /// The runtime object identity of the message.
\r
2052 /// A bool indicating whether the reservation worked or not
\r
2055 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
2057 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
2059 // Allow reservation if this is the head message
\r
2060 return _M_messageBuffer.is_head(_MsgId);
\r
2064 /// Consumes a message previously offered by the source and reserved by the target,
\r
2065 /// transferring ownership to the caller.
\r
2067 /// <param name="_MsgId">
\r
2068 /// The runtime object identity of the message.
\r
2071 /// A pointer to the message that the caller now has ownership of.
\r
2074 /// Similar to 'accept', but is always preceded by a call to 'reserve'
\r
2076 virtual message<_Output> * consume_message(runtime_object_identity _MsgId)
\r
2078 // By default, accept the message
\r
2079 return accept_message(_MsgId);
\r
2083 /// Releases a previous message reservation.
\r
2085 /// <param name="_MsgId">
\r
2086 /// The runtime object identity of the message.
\r
2088 virtual void release_message(runtime_object_identity _MsgId)
\r
2090 // The head message is the one reserved.
\r
2091 if (!_M_messageBuffer.is_head(_MsgId))
\r
2093 throw message_not_found();
\r
2098 /// Resumes propagation after a reservation has been released
\r
2100 virtual void resume_propagation()
\r
2102 // If there are any messages in the buffer, propagate them out
\r
2103 if (_M_messageBuffer.count() > 0)
\r
2110 /// Notification that a target was linked to this source.
\r
2112 /// <param name="_PTarget">
\r
2113 /// A pointer to the newly linked target.
\r
2115 virtual void link_target_notification(ITarget<_Output> *)
\r
2117 // If the message queue is blocked due to reservation
\r
2118 // there is no need to do any message propagation
\r
2119 if (_M_pReservedFor != NULL)
\r
2124 _Propagate_priority_order(_M_messageBuffer);
\r
2128 /// Takes the message and propagates it to all the target of this join.
\r
2129 /// This is called from async_send.
\r
2131 /// <param name="_PMessage">
\r
2132 /// The message being propagated
\r
2134 void propagate_to_any_targets(message<_Output> *)
\r
2136 message<_Output> * _Msg = NULL;
\r
2137 // Create a new message from the input sources
\r
2138 // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from
\r
2139 // a consume or release from the target. In that case we don't want to create a new message.
\r
2140 if (_M_messagesRemaining == 0)
\r
2142 // A greedy join can immediately create the message, a non-greedy
\r
2143 // join must try and consume all the messages it has postponed
\r
2144 _Msg = _Create_new_message();
\r
2149 // Create message failed. This happens in non_greedy joins when the
\r
2150 // reserve/consumption of a postponed message failed.
\r
2151 _Propagate_priority_order(_M_messageBuffer);
\r
2155 bool fIsGreedy = (_Jtype == greedy);
\r
2157 // For a greedy join, reset the number of messages remaining
\r
2158 // Check to see if multiple messages have been passed in on any of the links,
\r
2159 // and postponed. If so, try and reserve/consume them now
\r
2162 // Look at the saved ids and reserve/consume any that have passed in while
\r
2163 // this join was waiting to complete
\r
2164 _ASSERTE(_M_messageArray.size() == _M_savedMessageIdArray.size());
\r
2166 for (size_t i = 0; i < _M_messageArray.size(); i++)
\r
2170 runtime_object_identity _Saved_id;
\r
2171 // Grab the current saved id value. This value could be changing from based on any
\r
2172 // calls of source->propagate(this). If the message id is different than what is snapped
\r
2173 // here, that means, the reserve below must fail. This is because reserve is trying
\r
2174 // to get the same source lock the propagate(this) call must be holding.
\r
2176 critical_section::scoped_lock lockHolder(_M_propagationLock);
\r
2178 _ASSERTE(_M_messageArray[i] != NULL);
\r
2180 _Saved_id = _M_savedMessageIdArray[i];
\r
2182 if (_Saved_id == -1)
\r
2184 _M_messageArray[i] = NULL;
\r
2189 _M_savedMessageIdArray[i] = -1;
\r
2193 if (_Saved_id != -1)
\r
2195 source_iterator _Iter = _M_connectedSources.begin();
\r
2197 ISource<_Input> * _PSource = _Iter[i];
\r
2198 if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))
\r
2200 _M_messageArray[i] = _PSource->consume(_Saved_id, this);
\r
2201 _InterlockedDecrementSizeT(&_M_messagesRemaining);
\r
2208 // If messages have all been received, async_send again, this will start the
\r
2209 // LWT up to create a new message
\r
2210 if (_M_messagesRemaining == 0)
\r
2216 // Add the new message to the outbound queue
\r
2217 _M_messageBuffer.enqueue(_Msg);
\r
2219 if (!_M_messageBuffer.is_head(_Msg->msg_id()))
\r
2221 // another message is at the head of the outbound message queue and blocked
\r
2226 _Propagate_priority_order(_M_messageBuffer);
\r
2232 // Private Methods
\r
2236 /// Propagate messages in priority order
\r
2238 /// <param name="_MessageBuffer">
\r
2239 /// Reference to a message queue with messages to be propagated
\r
2241 void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)
\r
2243 message<_Output> * _Msg = _MessageBuffer.peek();
\r
2245 // If someone has reserved the _Head message, don't propagate anymore
\r
2246 if (_M_pReservedFor != NULL)
\r
2251 while (_Msg != NULL)
\r
2253 message_status _Status = declined;
\r
2255 // Always start from the first target that linked
\r
2256 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
2258 ITarget<_Output> * _PTarget = *_Iter;
\r
2259 _Status = _PTarget->propagate(_Msg, this);
\r
2261 // Ownership of message changed. Do not propagate this
\r
2262 // message to any other target.
\r
2263 if (_Status == accepted)
\r
2268 // If the target just propagated to reserved this message, stop
\r
2269 // propagating it to others
\r
2270 if (_M_pReservedFor != NULL)
\r
2276 // If status is anything other than accepted, then the head message
\r
2277 // was not propagated out. Thus, nothing after it in the queue can
\r
2278 // be propagated out. Cease propagation.
\r
2279 if (_Status != accepted)
\r
2284 // Get the next message
\r
2285 _Msg = _MessageBuffer.peek();
\r
2290 /// Create a new message from the data output
\r
2293 /// The created message (NULL if creation failed)
\r
2295 message<_Output> * __cdecl _Create_new_message()
\r
2297 bool fIsNonGreedy = (_Jtype == non_greedy);
\r
2299 // If this is a non-greedy join, check each source and try to consume their message
\r
2303 // The iterator _Iter below will ensure that it is safe to touch
\r
2304 // non-NULL source pointers. Take a snapshot.
\r
2305 std::vector<ISource<_Input> *> _Sources;
\r
2306 source_iterator _Iter = _M_connectedSources.begin();
\r
2308 while (*_Iter != NULL)
\r
2310 ISource<_Input> * _PSource = *_Iter;
\r
2312 if (_PSource == NULL)
\r
2317 _Sources.push_back(_PSource);
\r
2321 if (_Sources.size() != _M_messageArray.size())
\r
2323 // Some of the sources were unlinked. The join is broken
\r
2327 // First, try and reserve all the messages. If a reservation fails,
\r
2328 // then release any reservations that had been made.
\r
2329 for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)
\r
2331 // Snap the current saved id into a buffer. This value can be changing behind the scenes from
\r
2332 // other source->propagate(msg, this) calls, but if so, that just means the reserve below will
\r
2334 _InterlockedIncrementSizeT(&_M_messagesRemaining);
\r
2335 _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);
\r
2337 _ASSERTE(_M_savedIdBuffer[i] != -1);
\r
2339 if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))
\r
2341 // A reservation failed, release all reservations made up until
\r
2342 // this block, and wait for another message to arrive on this link
\r
2343 for (size_t j = 0; j < i; j++)
\r
2345 _Sources[j]->release(_M_savedIdBuffer[j], this);
\r
2346 if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1) == -1)
\r
2348 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)
\r
2355 // Return NULL to indicate that the create failed
\r
2360 // Since everything has been reserved, consume all the messages.
\r
2361 // This is guaranteed to return true.
\r
2362 for (size_t i = 0; i < _M_messageArray.size(); i++)
\r
2364 _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);
\r
2365 _M_savedIdBuffer[i] = -1;
\r
2369 if (!fIsNonGreedy)
\r
2371 // Reinitialize how many messages are being waited for.
\r
2372 // This is safe because all messages have been received, thus no new async_sends for
\r
2373 // greedy joins can be called.
\r
2374 _M_messagesRemaining = _M_messageArray.size();
\r
2377 std::vector<_Input> _OutputVector;
\r
2378 for (size_t i = 0; i < _M_messageArray.size(); i++)
\r
2380 _ASSERTE(_M_messageArray[i] != NULL);
\r
2381 _OutputVector.push_back(_M_messageArray[i]->payload);
\r
2383 delete _M_messageArray[i];
\r
2386 _M_messageArray[i] = NULL;
\r
2390 _Output _Out = _M_pFunc(_OutputVector);
\r
2392 return (new message<_Output>(_Out));
\r
2396 /// Initialize the join block
\r
2398 /// <param name="_NumInputs">
\r
2399 /// The number of inputs
\r
2401 void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
\r
2403 initialize_source_and_target(_PScheduler, _PScheduleGroup);
\r
2405 _M_connectedSources.set_bound(_NumInputs);
\r
2406 _M_messagesRemaining = _NumInputs;
\r
2408 bool fIsNonGreedy = (_Jtype == non_greedy);
\r
2412 // Non greedy joins need a buffer to snap off saved message ids to.
\r
2413 _M_savedIdBuffer = new runtime_object_identity[_NumInputs];
\r
2414 memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);
\r
2418 _M_savedIdBuffer = NULL;
\r
2422 // The current number of messages remaining
\r
2423 volatile size_t _M_messagesRemaining;
\r
2425 // An array containing the accepted messages of this join.
\r
2426 std::vector<message<_Input>*> _M_messageArray;
\r
2428 // An array containing the msg ids of messages propagated to the array
\r
2429 // For greedy joins, this contains a log of other messages passed to this
\r
2430 // join after the first has been accepted
\r
2431 // For non-greedy joins, this contains the message id of any message
\r
2433 std::vector<runtime_object_identity> _M_savedMessageIdArray;
\r
2435 // The transformer method called by this block
\r
2436 _Transform_method _M_pFunc;
\r
2438 // Buffer for snapping saved ids in non-greedy joins
\r
2439 runtime_object_identity * _M_savedIdBuffer;
\r
2441 // A lock for modifying the buffer or the connected blocks
\r
2442 ::Concurrency::critical_section _M_propagationLock;
\r
2444 // Queue to hold output messages
\r
2445 MessageQueue<_Output> _M_messageBuffer;
\r
2449 // Message block that invokes a transform method when it receives message on any of the input links.
\r
2450 // A typical example is recal engine for a cell in an Excel spreadsheet.
\r
2451 // (Remember that a normal join block is triggered only when it receives messages on all its input links).
\r
2453 template<class _Input, class _Output>
\r
2454 class recalculate : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>
\r
2457 typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Recalculate_method;
\r
2460 /// Create an recalculate block within the default scheduler, and places it any schedule
\r
2461 /// group of the scheduler
\92s choosing.
\r
2463 /// <param name="_NumInputs">
\r
2464 /// The number of inputs
\r
2466 recalculate(size_t _NumInputs, _Recalculate_method const& _Func)
\r
2467 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2470 _Initialize(_NumInputs);
\r
2474 /// Create an recalculate block within the default scheduler, and places it any schedule
\r
2475 /// group of the scheduler
\92s choosing.
\r
2477 /// <param name="_NumInputs">
\r
2478 /// The number of inputs
\r
2480 /// <param name="_Filter">
\r
2481 /// A filter method placed on this join
\r
2483 recalculate(size_t _NumInputs, _Recalculate_method const& _Func, filter_method const& _Filter)
\r
2484 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2487 _Initialize(_NumInputs);
\r
2488 register_filter(_Filter);
\r
2492 /// Create an recalculate block within the specified scheduler, and places it any schedule
\r
2493 /// group of the scheduler
\92s choosing.
\r
2495 /// <param name="_Scheduler">
\r
2496 /// The scheduler onto which the task's message propagation will be scheduled.
\r
2498 /// <param name="_NumInputs">
\r
2499 /// The number of inputs
\r
2501 recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func)
\r
2502 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2505 _Initialize(_NumInputs, &_PScheduler);
\r
2509 /// Create an recalculate block within the specified scheduler, and places it any schedule
\r
2510 /// group of the scheduler
\92s choosing.
\r
2512 /// <param name="_Scheduler">
\r
2513 /// The scheduler onto which the task's message propagation will be scheduled.
\r
2515 /// <param name="_NumInputs">
\r
2516 /// The number of inputs
\r
2518 /// <param name="_Filter">
\r
2519 /// A filter method placed on this join
\r
2521 recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func, filter_method const& _Filter)
\r
2522 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2525 _Initialize(_NumInputs, &_PScheduler);
\r
2526 register_filter(_Filter);
\r
2530 /// Create an recalculate block within the specified schedule group. The scheduler is implied
\r
2531 /// by the schedule group.
\r
2533 /// <param name="_PScheduleGroup">
\r
2534 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
2536 /// <param name="_NumInputs">
\r
2537 /// The number of inputs
\r
2539 recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func)
\r
2540 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2543 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
2547 /// Create an recalculate block within the specified schedule group. The scheduler is implied
\r
2548 /// by the schedule group.
\r
2550 /// <param name="_PScheduleGroup">
\r
2551 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
2553 /// <param name="_NumInputs">
\r
2554 /// The number of inputs
\r
2556 /// <param name="_Filter">
\r
2557 /// A filter method placed on this join
\r
2559 recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func, filter_method const& _Filter)
\r
2560 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2563 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
2564 register_filter(_Filter);
\r
2568 /// Destroys a join
\r
2572 // Remove all links that are targets of this join
\r
2573 remove_network_links();
\r
2575 delete [] _M_savedIdBuffer;
\r
2580 // propagator_block protected function implementations
\r
2584 /// The main propagate() function for ITarget blocks. Called by a source
\r
2585 /// block, generally within an asynchronous task to send messages to its targets.
\r
2587 /// <param name="_PMessage">
\r
2588 /// The message being propagated
\r
2590 /// <param name="_PSource">
\r
2591 /// The source doing the propagation
\r
2594 /// An indication of what the target decided to do with the message.
\r
2597 /// It is important that calls to propagate do *not* take the same lock on the
\r
2598 /// internal structure that is used by Consume and the LWT. Doing so could
\r
2599 /// result in a deadlock with the Consume call.
\r
2601 message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource)
\r
2604 // Find the slot index of this source
\r
2607 bool _Found = false;
\r
2608 for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
\r
2610 if (*_Iter == _PSource)
\r
2621 // If this source was not found in the array, this is not a connected source
\r
2622 // decline the message
\r
2627 // Save the message id
\r
2629 if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)
\r
2631 // If it is not seen by Create_message attempt a recalculate
\r
2635 // Always return postponed. This message will be consumed
\r
2641 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
2643 /// <param name="_MsgId">
\r
2644 /// The runtime object identity of the message.
\r
2647 /// A pointer to the message that the caller now has ownership of.
\r
2649 virtual message<_Output> * accept_message(runtime_object_identity _MsgId)
\r
2652 // Peek at the head message in the message buffer. If the Ids match
\r
2653 // dequeue and transfer ownership
\r
2655 message<_Output> * _Msg = NULL;
\r
2657 if (_M_messageBuffer.is_head(_MsgId))
\r
2659 _Msg = _M_messageBuffer.dequeue();
\r
2666 /// Reserves a message previously offered by the source.
\r
2668 /// <param name="_MsgId">
\r
2669 /// The runtime object identity of the message.
\r
2672 /// A bool indicating whether the reservation worked or not
\r
2675 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
2677 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
2679 // Allow reservation if this is the head message
\r
2680 return _M_messageBuffer.is_head(_MsgId);
\r
2684 /// Consumes a message previously offered by the source and reserved by the target,
\r
2685 /// transferring ownership to the caller.
\r
2687 /// <param name="_MsgId">
\r
2688 /// The runtime object identity of the message.
\r
2691 /// A pointer to the message that the caller now has ownership of.
\r
2694 /// Similar to 'accept', but is always preceded by a call to 'reserve'
\r
2696 virtual message<_Output> * consume_message(runtime_object_identity _MsgId)
\r
2698 // By default, accept the message
\r
2699 return accept_message(_MsgId);
\r
2703 /// Releases a previous message reservation.
\r
2705 /// <param name="_MsgId">
\r
2706 /// The runtime object identity of the message.
\r
2708 virtual void release_message(runtime_object_identity _MsgId)
\r
2710 // The head message is the one reserved.
\r
2711 if (!_M_messageBuffer.is_head(_MsgId))
\r
2713 throw message_not_found();
\r
2718 /// Resumes propagation after a reservation has been released
\r
2720 virtual void resume_propagation()
\r
2722 // If there are any messages in the buffer, propagate them out
\r
2723 if (_M_messageBuffer.count() > 0)
\r
2730 /// Notification that a target was linked to this source.
\r
2732 /// <param name="_PTarget">
\r
2733 /// A pointer to the newly linked target.
\r
2735 virtual void link_target_notification(ITarget<_Output> *)
\r
2737 // If the message queue is blocked due to reservation
\r
2738 // there is no need to do any message propagation
\r
2739 if (_M_pReservedFor != NULL)
\r
2744 _Propagate_priority_order(_M_messageBuffer);
\r
2748 /// Takes the message and propagates it to all the target of this join.
\r
2749 /// This is called from async_send.
\r
2751 /// <param name="_PMessage">
\r
2752 /// The message being propagated
\r
2754 void propagate_to_any_targets(message<_Output> *)
\r
2756 // Attempt to create a new message
\r
2757 message<_Output> * _Msg = _Create_new_message();
\r
2761 // Add the new message to the outbound queue
\r
2762 _M_messageBuffer.enqueue(_Msg);
\r
2764 if (!_M_messageBuffer.is_head(_Msg->msg_id()))
\r
2766 // another message is at the head of the outbound message queue and blocked
\r
2772 _Propagate_priority_order(_M_messageBuffer);
\r
2778 // Private Methods
\r
2782 /// Propagate messages in priority order
\r
2784 /// <param name="_MessageBuffer">
\r
2785 /// Reference to a message queue with messages to be propagated
\r
2787 void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)
\r
2789 message<_Output> * _Msg = _MessageBuffer.peek();
\r
2791 // If someone has reserved the _Head message, don't propagate anymore
\r
2792 if (_M_pReservedFor != NULL)
\r
2797 while (_Msg != NULL)
\r
2799 message_status _Status = declined;
\r
2801 // Always start from the first target that linked
\r
2802 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
2804 ITarget<_Output> * _PTarget = *_Iter;
\r
2805 _Status = _PTarget->propagate(_Msg, this);
\r
2807 // Ownership of message changed. Do not propagate this
\r
2808 // message to any other target.
\r
2809 if (_Status == accepted)
\r
2814 // If the target just propagated to reserved this message, stop
\r
2815 // propagating it to others
\r
2816 if (_M_pReservedFor != NULL)
\r
2822 // If status is anything other than accepted, then the head message
\r
2823 // was not propagated out. Thus, nothing after it in the queue can
\r
2824 // be propagated out. Cease propagation.
\r
2825 if (_Status != accepted)
\r
2830 // Get the next message
\r
2831 _Msg = _MessageBuffer.peek();
\r
2836 /// Create a new message from the data output
\r
2839 /// The created message (NULL if creation failed)
\r
2841 message<_Output> * __cdecl _Create_new_message()
\r
2843 // If this is a non-greedy join, check each source and try to consume their message
\r
2844 size_t _NumInputs = _M_savedMessageIdArray.size();
\r
2846 // The iterator _Iter below will ensure that it is safe to touch
\r
2847 // non-NULL source pointers. Take a snapshot.
\r
2848 std::vector<ISource<_Input> *> _Sources;
\r
2849 source_iterator _Iter = _M_connectedSources.begin();
\r
2851 while (*_Iter != NULL)
\r
2853 ISource<_Input> * _PSource = *_Iter;
\r
2855 if (_PSource == NULL)
\r
2860 _Sources.push_back(_PSource);
\r
2864 if (_Sources.size() != _NumInputs)
\r
2866 // Some of the sources were unlinked. The join is broken
\r
2870 // First, try and reserve all the messages. If a reservation fails,
\r
2871 // then release any reservations that had been made.
\r
2872 for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)
\r
2874 // Swap the id to -1 indicating that we have used that value for a recalculate
\r
2875 _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);
\r
2877 // If the id is -1, either we have never received a message on that link or the previous message is stored
\r
2878 // in the message array. If it is the former we abort.
\r
2879 // If the id is not -1, we attempt to reserve the message. On failure we abort.
\r
2880 if (((_M_savedIdBuffer[i] == -1) && (_M_messageArray[i] == NULL))
\r
2881 || ((_M_savedIdBuffer[i] != -1) && !_Sources[i]->reserve(_M_savedIdBuffer[i], this)))
\r
2883 // Abort. Release all reservations made up until this block,
\r
2884 // and wait for another message to arrive.
\r
2885 for (size_t j = 0; j < i; j++)
\r
2887 if (_M_savedIdBuffer[j] != -1)
\r
2889 _Sources[j]->release(_M_savedIdBuffer[j], this);
\r
2890 _InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1);
\r
2894 // Return NULL to indicate that the create failed
\r
2899 // Since everything has been reserved, consume all the messages.
\r
2900 // This is guaranteed to return true.
\r
2901 size_t _NewMessages = 0;
\r
2902 for (size_t i = 0; i < _NumInputs; i++)
\r
2904 if (_M_savedIdBuffer[i] != -1)
\r
2906 // Delete previous message since we have a new one
\r
2907 if (_M_messageArray[i] != NULL)
\r
2909 delete _M_messageArray[i];
\r
2911 _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);
\r
2912 _M_savedIdBuffer[i] = -1;
\r
2917 if (_NewMessages == 0)
\r
2919 // There is no need to recal if we did not consume a new message
\r
2923 std::vector<_Input> _OutputVector;
\r
2924 for (size_t i = 0; i < _NumInputs; i++)
\r
2926 _ASSERTE(_M_messageArray[i] != NULL);
\r
2927 _OutputVector.push_back(_M_messageArray[i]->payload);
\r
2930 _Output _Out = _M_pFunc(_OutputVector);
\r
2931 return (new message<_Output>(_Out));
\r
2935 /// Initialize the recalculate block
\r
2937 /// <param name="_NumInputs">
\r
2938 /// The number of inputs
\r
2940 void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
\r
2942 initialize_source_and_target(_PScheduler, _PScheduleGroup);
\r
2944 _M_connectedSources.set_bound(_NumInputs);
\r
2946 // Non greedy joins need a buffer to snap off saved message ids to.
\r
2947 _M_savedIdBuffer = new runtime_object_identity[_NumInputs];
\r
2948 memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);
\r
2951 // An array containing the accepted messages of this join.
\r
2952 std::vector<message<_Input>*> _M_messageArray;
\r
2954 // An array containing the msg ids of messages propagated to the array
\r
2955 // For non-greedy joins, this contains the message id of any message
\r
2957 std::vector<runtime_object_identity> _M_savedMessageIdArray;
\r
2959 // Buffer for snapping saved ids in non-greedy joins
\r
2960 runtime_object_identity * _M_savedIdBuffer;
\r
2962 // The transformer method called by this block
\r
2963 _Recalculate_method _M_pFunc;
\r
2965 // Queue to hold output messages
\r
2966 MessageQueue<_Output> _M_messageBuffer;
\r
2970 // Container class to hold a join_transform block and keep unbounded buffers in front of each input.
\r
2972 template<class _Input, class _Output>
\r
2973 class buffered_join
\r
2975 typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;
\r
2978 buffered_join(int _NumInputs, _Transform_method const& _Func): m_currentInput(0), m_numInputs(_NumInputs)
\r
2980 m_buffers = new unbounded_buffer<_Input>*[_NumInputs];
\r
2981 m_join = new join_transform<_Input,_Output,greedy>(_NumInputs, _Func);
\r
2983 for(int i = 0; i < _NumInputs; i++)
\r
2985 m_buffers[i] = new unbounded_buffer<_Input>;
\r
2986 m_buffers[i]->link_target(m_join);
\r
2992 for(int i = 0; i < m_numInputs; i++)
\r
2993 delete m_buffers[i];
\r
2994 delete [] m_buffers;
\r
2998 // Add input takes _PSource and connects it to the next input on this block
\r
2999 void add_input(ISource<_Input> * _PSource)
\r
3001 _PSource->link_target(m_buffers[m_currentInput]);
\r
3005 // link_target links this container block to _PTarget
\r
3006 void link_target(ITarget<_Output> * _PTarget)
\r
3008 m_join->link_target(_PTarget);
\r
3012 int m_currentInput;
\r
3014 unbounded_buffer<_Input> ** m_buffers;
\r
3015 join_transform<_Input,_Output,greedy> * m_join;
\r
3017 } // namespace Concurrency
\r