1 //--------------------------------------------------------------------------
\r
3 // Copyright (c) Microsoft Corporation. All rights reserved.
\r
5 // File: agents_extras.h
\r
7 // Implementation of various useful message blocks
\r
9 //--------------------------------------------------------------------------
\r
15 // bounded_buffer uses a map
\r
19 namespace Concurrency
\r
22 /// Simple queue class for storing messages.
\r
24 /// <typeparam name="_Type">
\r
25 /// The payload type of messages stored in this queue.
\r
27 template <class _Type>
\r
31 typedef message<_Type> _Message;
\r
34 /// Constructs an initially empty queue.
\r
41 /// Removes and deletes any messages remaining in the queue.
\r
45 _Message * _Msg = dequeue();
\r
46 while (_Msg != NULL)
\r
54 /// Add an item to the queue.
\r
56 /// <param name="_Msg">
\r
59 void enqueue(_Message *_Msg)
\r
61 _M_queue.push(_Msg);
\r
65 /// Dequeue an item from the head of queue.
\r
68 /// Returns a pointer to the message found at the head of the queue.
\r
70 _Message * dequeue()
\r
72 _Message * _Msg = NULL;
\r
74 if (!_M_queue.empty())
\r
76 _Msg = _M_queue.front();
\r
84 /// Return the item at the head of the queue, without dequeuing
\r
87 /// Returns a pointer to the message found at the head of the queue.
\r
89 _Message * peek() const
\r
91 _Message * _Msg = NULL;
\r
93 if (!_M_queue.empty())
\r
95 _Msg = _M_queue.front();
\r
102 /// Returns the number of items currently in the queue.
\r
105 /// Size of the queue.
\r
107 size_t count() const
\r
109 return _M_queue.size();
\r
113 /// Checks to see if specified msg id is at the head of the queue.
\r
115 /// <param name="_MsgId">
\r
116 /// Message id to check for.
\r
119 /// True if a message with specified id is at the head, false otherwise.
\r
121 bool is_head(const runtime_object_identity _MsgId) const
\r
123 _Message * _Msg = peek();
\r
126 return _Msg->msg_id() == _MsgId;
\r
133 std::queue<_Message *> _M_queue;
\r
137 /// Simple queue implementation that takes into account priority
\r
138 /// using the comparison operator <.
\r
140 /// <typeparam name="_Type">
\r
141 /// The payload type of messages stored in this queue.
\r
143 template <class _Type>
\r
144 class PriorityQueue
\r
148 /// Constructs an initially empty queue.
\r
150 PriorityQueue() : _M_pHead(NULL), _M_count(0) {}
\r
153 /// Removes and deletes any messages remaining in the queue.
\r
157 message<_Type> * _Msg = dequeue();
\r
158 while (_Msg != NULL)
\r
166 /// Add an item to the queue, comparisons using the 'payload' field
\r
167 /// will determine the location in the queue.
\r
169 /// <param name="_Msg">
\r
170 /// Message to add.
\r
172 /// <param name="fCanReplaceHead">
\r
173 /// True if this new message can be inserted at the head.
\r
175 void enqueue(message<_Type> *_Msg, const bool fInsertAtHead = true)
\r
177 MessageNode *_Element = new MessageNode();
\r
178 _Element->_M_pMsg = _Msg;
\r
180 // Find location to insert.
\r
181 MessageNode *pCurrent = _M_pHead;
\r
182 MessageNode *pPrev = NULL;
\r
183 if(!fInsertAtHead && pCurrent != NULL)
\r
186 pCurrent = pCurrent->_M_pNext;
\r
188 while(pCurrent != NULL)
\r
190 if(_Element->_M_pMsg->payload < pCurrent->_M_pMsg->payload)
\r
195 pCurrent = pCurrent->_M_pNext;
\r
201 _M_pHead = _Element;
\r
205 pPrev->_M_pNext = _Element;
\r
208 // Last item in queue.
\r
209 if(pCurrent == NULL)
\r
211 _Element->_M_pNext = NULL;
\r
215 _Element->_M_pNext = pCurrent;
\r
222 /// Dequeue an item from the head of queue.
\r
225 /// Returns a pointer to the message found at the head of the queue.
\r
227 message<_Type> * dequeue()
\r
229 if (_M_pHead == NULL)
\r
234 MessageNode *_OldHead = _M_pHead;
\r
235 message<_Type> * _Result = _OldHead->_M_pMsg;
\r
237 _M_pHead = _OldHead->_M_pNext;
\r
241 if(--_M_count == 0)
\r
249 /// Return the item at the head of the queue, without dequeuing
\r
252 /// Returns a pointer to the message found at the head of the queue.
\r
254 message<_Type> * peek() const
\r
258 return _M_pHead->_M_pMsg;
\r
264 /// Returns the number of items currently in the queue.
\r
267 /// Size of the queue.
\r
269 size_t count() const
\r
275 /// Checks to see if specified msg id is at the head of the queue.
\r
277 /// <param name="_MsgId">
\r
278 /// Message id to check for.
\r
281 /// True if a message with specified id is at the head, false otherwise.
\r
283 bool is_head(const runtime_object_identity _MsgId) const
\r
287 return _M_pHead->_M_pMsg->msg_id() == _MsgId;
\r
294 // Used to store individual message nodes.
\r
297 MessageNode() : _M_pMsg(NULL), _M_pNext(NULL) {}
\r
298 message<_Type> * _M_pMsg;
\r
299 MessageNode * _M_pNext;
\r
302 // A pointer to the head of the queue.
\r
303 MessageNode * _M_pHead;
\r
305 // The number of elements presently stored in the queue.
\r
310 /// priority_buffer is a buffer that uses a comparison operator on the 'payload' of each message to determine
\r
311 /// order when offering to targets. Besides this it acts exactly like an unbounded_buffer.
\r
313 /// <typeparam name="_Type">
\r
314 /// The payload type of messages stored and propagated by the buffer.
\r
316 template<class _Type>
\r
317 class priority_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
\r
322 /// Creates an priority_buffer within the default scheduler, and places it any schedule
\r
323 /// group of the scheduler
\92s choosing.
\r
327 initialize_source_and_target();
\r
331 /// Creates an priority_buffer within the default scheduler, and places it any schedule
\r
332 /// group of the scheduler
\92s choosing.
\r
334 /// <param name="_Filter">
\r
335 /// A reference to a filter function.
\r
337 priority_buffer(filter_method const& _Filter)
\r
339 initialize_source_and_target();
\r
340 register_filter(_Filter);
\r
344 /// Creates an priority_buffer within the specified scheduler, and places it any schedule
\r
345 /// group of the scheduler
\92s choosing.
\r
347 /// <param name="_PScheduler">
\r
348 /// A reference to a scheduler instance.
\r
350 priority_buffer(Scheduler& _PScheduler)
\r
352 initialize_source_and_target(&_PScheduler);
\r
356 /// Creates an priority_buffer within the specified scheduler, and places it any schedule
\r
357 /// group of the scheduler
\92s choosing.
\r
359 /// <param name="_PScheduler">
\r
360 /// A reference to a scheduler instance.
\r
362 /// <param name="_Filter">
\r
363 /// A reference to a filter function.
\r
365 priority_buffer(Scheduler& _PScheduler, filter_method const& _Filter)
\r
367 initialize_source_and_target(&_PScheduler);
\r
368 register_filter(_Filter);
\r
372 /// Creates an priority_buffer within the specified schedule group. The scheduler is implied
\r
373 /// by the schedule group.
\r
375 /// <param name="_PScheduleGroup">
\r
376 /// A reference to a schedule group.
\r
378 priority_buffer(ScheduleGroup& _PScheduleGroup)
\r
380 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
384 /// Creates an priority_buffer within the specified schedule group. The scheduler is implied
\r
385 /// by the schedule group.
\r
387 /// <param name="_PScheduleGroup">
\r
388 /// A reference to a schedule group.
\r
390 /// <param name="_Filter">
\r
391 /// A reference to a filter function.
\r
393 priority_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)
\r
395 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
396 register_filter(_Filter);
\r
400 /// Cleans up any resources that may have been created by the priority_buffer.
\r
404 // Remove all links
\r
405 remove_network_links();
\r
409 /// Add an item to the priority_buffer
\r
411 /// <param name="_Item">
\r
412 /// A reference to the item to add.
\r
415 /// A boolean indicating whether the data was accepted.
\r
417 bool enqueue(_Type const& _Item)
\r
419 return Concurrency::send<_Type>(this, _Item);
\r
423 /// Remove an item from the priority_buffer
\r
426 /// The message payload.
\r
430 return receive<_Type>(this);
\r
436 /// The main propagate() function for ITarget blocks. Called by a source
\r
437 /// block, generally within an asynchronous task to send messages to its targets.
\r
439 /// <param name="_PMessage">
\r
440 /// A pointer to the message.
\r
442 /// <param name="_PSource">
\r
443 /// A pointer to the source block offering the message.
\r
446 /// An indication of what the target decided to do with the message.
\r
449 /// It is important that calls to propagate do *not* take the same lock on the
\r
450 /// internal structure that is used by Consume and the LWT. Doing so could
\r
451 /// result in a deadlock with the Consume call. (in the case of the priority_buffer,
\r
452 /// this lock is the m_internalLock)
\r
454 virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
456 message_status _Result = accepted;
\r
458 // Accept the message being propagated
\r
459 // Note: depending on the source block propagating the message
\r
460 // this may not necessarily be the same message (pMessage) first
\r
461 // passed into the function.
\r
463 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
465 if (_PMessage != NULL)
\r
467 async_send(_PMessage);
\r
478 /// Synchronously sends a message to this block. When this function completes the message will
\r
479 /// already have propagated into the block.
\r
481 /// <param name="_PMessage">
\r
482 /// A pointer to the message.
\r
484 /// <param name="_PSource">
\r
485 /// A pointer to the source block offering the message.
\r
488 /// An indication of what the target decided to do with the message.
\r
490 virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
492 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
494 if (_PMessage != NULL)
\r
496 sync_send(_PMessage);
\r
507 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
509 /// <param name="_MsgId">
\r
510 /// The runtime object identity of the message.
\r
513 /// A pointer to the message that the caller now has ownership of.
\r
515 virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
\r
518 // Peek at the head message in the message buffer. If the Ids match
\r
519 // dequeue and transfer ownership
\r
521 message<_Type> * _Msg = NULL;
\r
523 if (_M_messageBuffer.is_head(_MsgId))
\r
525 _Msg = _M_messageBuffer.dequeue();
\r
532 /// Reserves a message previously offered by the source.
\r
534 /// <param name="_MsgId">
\r
535 /// The runtime object identity of the message.
\r
538 /// A Boolean indicating whether the reservation worked or not.
\r
541 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
543 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
545 // Allow reservation if this is the head message
\r
546 return _M_messageBuffer.is_head(_MsgId);
\r
550 /// Consumes a message that was reserved previously.
\r
552 /// <param name="_MsgId">
\r
553 /// The runtime object identity of the message.
\r
556 /// A pointer to the message that the caller now has ownership of.
\r
559 /// Similar to 'accept', but is always preceded by a call to 'reserve'.
\r
561 virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
\r
563 // By default, accept the message
\r
564 return accept_message(_MsgId);
\r
568 /// Releases a previous message reservation.
\r
570 /// <param name="_MsgId">
\r
571 /// The runtime object identity of the message.
\r
573 virtual void release_message(runtime_object_identity _MsgId)
\r
575 // The head message is the one reserved.
\r
576 if (!_M_messageBuffer.is_head(_MsgId))
\r
578 throw message_not_found();
\r
583 /// Resumes propagation after a reservation has been released
\r
585 virtual void resume_propagation()
\r
587 // If there are any messages in the buffer, propagate them out
\r
588 if (_M_messageBuffer.count() > 0)
\r
595 /// Notification that a target was linked to this source.
\r
597 /// <param name="_PTarget">
\r
598 /// A pointer to the newly linked target.
\r
600 virtual void link_target_notification(ITarget<_Type> * _PTarget)
\r
602 // If the message queue is blocked due to reservation
\r
603 // there is no need to do any message propagation
\r
604 if (_M_pReservedFor != NULL)
\r
609 message<_Type> * _Msg = _M_messageBuffer.peek();
\r
613 // Propagate the head message to the new target
\r
614 message_status _Status = _PTarget->propagate(_Msg, this);
\r
616 if (_Status == accepted)
\r
618 // The target accepted the message, restart propagation.
\r
619 propagate_to_any_targets(NULL);
\r
622 // If the status is anything other than accepted, then leave
\r
623 // the message queue blocked.
\r
628 /// Takes the message and propagates it to all the targets of this priority_buffer.
\r
630 /// <param name="_PMessage">
\r
631 /// A pointer to a new message.
\r
633 virtual void propagate_to_any_targets(message<_Type> * _PMessage)
\r
635 // Enqueue pMessage to the internal unbounded buffer queue if it is non-NULL.
\r
636 // _PMessage can be NULL if this LWT was the result of a Repropagate call
\r
637 // out of a Consume or Release (where no new message is queued up, but
\r
638 // everything remaining in the priority_buffer needs to be propagated out)
\r
639 if (_PMessage != NULL)
\r
641 message<_Type> *pPrevHead = _M_messageBuffer.peek();
\r
643 // If a reservation is held make sure to not insert this new
\r
644 // message before it.
\r
645 if(_M_pReservedFor != NULL)
\r
647 _M_messageBuffer.enqueue(_PMessage, false);
\r
651 _M_messageBuffer.enqueue(_PMessage);
\r
654 // If the head message didn't change, we can safely assume that
\r
655 // the head message is blocked and waiting on Consume(), Release() or a new
\r
657 if (pPrevHead != NULL && !_M_messageBuffer.is_head(pPrevHead->msg_id()))
\r
663 // Attempt to propagate messages to all the targets
\r
664 _Propagate_priority_order();
\r
670 /// Attempts to propagate out any messages currently in the block.
\r
672 void _Propagate_priority_order()
\r
674 message<_Target_type> * _Msg = _M_messageBuffer.peek();
\r
676 // If someone has reserved the _Head message, don't propagate anymore
\r
677 if (_M_pReservedFor != NULL)
\r
682 while (_Msg != NULL)
\r
684 message_status _Status = declined;
\r
686 // Always start from the first target that linked.
\r
687 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
689 ITarget<_Target_type> * _PTarget = *_Iter;
\r
690 _Status = _PTarget->propagate(_Msg, this);
\r
692 // Ownership of message changed. Do not propagate this
\r
693 // message to any other target.
\r
694 if (_Status == accepted)
\r
699 // If the target just propagated to reserved this message, stop
\r
700 // propagating it to others.
\r
701 if (_M_pReservedFor != NULL)
\r
707 // If status is anything other than accepted, then the head message
\r
708 // was not propagated out. Thus, nothing after it in the queue can
\r
709 // be propagated out. Cease propagation.
\r
710 if (_Status != accepted)
\r
715 // Get the next message
\r
716 _Msg = _M_messageBuffer.peek();
\r
721 /// Priority Queue used to store messages.
\r
723 PriorityQueue<_Type> _M_messageBuffer;
\r
726 // Hide assignment operator and copy constructor.
\r
728 priority_buffer const &operator =(priority_buffer const&); // no assignment operator
\r
729 priority_buffer(priority_buffer const &); // no copy constructor
\r
734 /// A bounded_buffer implementation. Once the capacity is reached it will save the offered message
\r
735 /// id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume
\r
736 /// any of the postponed messages. Preference is given to previously offered messages before new ones.
\r
738 /// NOTE: this bounded_buffer implementation contains code that is very unique to this particular block.
\r
739 /// Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer
\r
740 /// implementation uses a critical_section, several interlocked operations, and additional calls to async_send.
\r
741 /// These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem.
\r
743 /// <typeparam name="_Type">
\r
744 /// The payload type of messages stored and propagated by the buffer.
\r
746 template<class _Type>
\r
747 class bounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
\r
751 /// Creates an bounded_buffer within the default scheduler, and places it any schedule
\r
752 /// group of the scheduler
\92s choosing.
\r
754 bounded_buffer(const size_t capacity)
\r
755 : _M_capacity(capacity), _M_currentSize(0)
\r
757 initialize_source_and_target();
\r
761 /// Creates an bounded_buffer within the default scheduler, and places it any schedule
\r
762 /// group of the scheduler
\92s choosing.
\r
764 /// <param name="_Filter">
\r
765 /// A reference to a filter function.
\r
767 bounded_buffer(const size_t capacity, filter_method const& _Filter)
\r
768 : _M_capacity(capacity), _M_currentSize(0)
\r
770 initialize_source_and_target();
\r
771 register_filter(_Filter);
\r
775 /// Creates an bounded_buffer within the specified scheduler, and places it any schedule
\r
776 /// group of the scheduler
\92s choosing.
\r
778 /// <param name="_PScheduler">
\r
779 /// A reference to a scheduler instance.
\r
781 bounded_buffer(const size_t capacity, Scheduler& _PScheduler)
\r
782 : _M_capacity(capacity), _M_currentSize(0)
\r
784 initialize_source_and_target(&_PScheduler);
\r
788 /// Creates an bounded_buffer within the specified scheduler, and places it any schedule
\r
789 /// group of the scheduler
\92s choosing.
\r
791 /// <param name="_PScheduler">
\r
792 /// A reference to a scheduler instance.
\r
794 /// <param name="_Filter">
\r
795 /// A reference to a filter function.
\r
797 bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter)
\r
798 : _M_capacity(capacity), _M_currentSize(0)
\r
800 initialize_source_and_target(&_PScheduler);
\r
801 register_filter(_Filter);
\r
805 /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied
\r
806 /// by the schedule group.
\r
808 /// <param name="_PScheduleGroup">
\r
809 /// A reference to a schedule group.
\r
811 bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup)
\r
812 : _M_capacity(capacity), _M_currentSize(0)
\r
814 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
818 /// Creates an bounded_buffer within the specified schedule group. The scheduler is implied
\r
819 /// by the schedule group.
\r
821 /// <param name="_PScheduleGroup">
\r
822 /// A reference to a schedule group.
\r
824 /// <param name="_Filter">
\r
825 /// A reference to a filter function.
\r
827 bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)
\r
828 : _M_capacity(capacity), _M_currentSize(0)
\r
830 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
831 register_filter(_Filter);
\r
835 /// Cleans up any resources that may have been used by the bounded_buffer.
\r
839 // Remove all links
\r
840 remove_network_links();
\r
844 /// Add an item to the bounded_buffer.
\r
846 /// <param name="_Item">
\r
847 /// A reference to the item to add.
\r
850 /// A boolean indicating whether the data was accepted.
\r
852 bool enqueue(_Type const& _Item)
\r
854 return Concurrency::send<_Type>(this, _Item);
\r
858 /// Remove an item from the bounded_buffer.
\r
861 /// The message payload.
\r
865 return receive<_Type>(this);
\r
871 /// The main propagate() function for ITarget blocks. Called by a source
\r
872 /// block, generally within an asynchronous task to send messages to its targets.
\r
874 /// <param name="_PMessage">
\r
875 /// A pointer to the message.
\r
877 /// <param name="_PSource">
\r
878 /// A pointer to the source block offering the message.
\r
881 /// An indication of what the target decided to do with the message.
\r
884 /// It is important that calls to propagate do *not* take the same lock on the
\r
885 /// internal structure that is used by Consume and the LWT. Doing so could
\r
886 /// result in a deadlock with the Consume call. (in the case of the bounded_buffer,
\r
887 /// this lock is the m_internalLock)
\r
889 virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
891 message_status _Result = accepted;
\r
893 // Check current capacity.
\r
894 if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
\r
896 // Postpone the message, buffer is full.
\r
897 _InterlockedDecrement(&_M_currentSize);
\r
898 _Result = postponed;
\r
900 // Save off the message id from this source to later try
\r
901 // and reserve/consume when more space is free.
\r
903 critical_section::scoped_lock scopedLock(_M_savedIdsLock);
\r
904 _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();
\r
912 // Accept the message being propagated
\r
913 // Note: depending on the source block propagating the message
\r
914 // this may not necessarily be the same message (pMessage) first
\r
915 // passed into the function.
\r
917 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
919 if (_PMessage != NULL)
\r
921 async_send(_PMessage);
\r
925 // Didn't get a message so need to decrement.
\r
926 _InterlockedDecrement(&_M_currentSize);
\r
936 /// Synchronously sends a message to this block. When this function completes the message will
\r
937 /// already have propagated into the block.
\r
939 /// <param name="_PMessage">
\r
940 /// A pointer to the message.
\r
942 /// <param name="_PSource">
\r
943 /// A pointer to the source block offering the message.
\r
946 /// An indication of what the target decided to do with the message.
\r
948 virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
950 message_status _Result = accepted;
\r
952 // Check current capacity.
\r
953 if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
\r
955 // Postpone the message, buffer is full.
\r
956 _InterlockedDecrement(&_M_currentSize);
\r
957 _Result = postponed;
\r
959 // Save off the message id from this source to later try
\r
960 // and reserve/consume when more space is free.
\r
962 critical_section::scoped_lock scopedLock(_M_savedIdsLock);
\r
963 _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();
\r
971 // Accept the message being propagated
\r
972 // Note: depending on the source block propagating the message
\r
973 // this may not necessarily be the same message (pMessage) first
\r
974 // passed into the function.
\r
976 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
978 if (_PMessage != NULL)
\r
980 async_send(_PMessage);
\r
984 // Didn't get a message so need to decrement.
\r
985 _InterlockedDecrement(&_M_currentSize);
\r
995 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
997 /// <param name="_MsgId">
\r
998 /// The runtime object identity of the message.
\r
1001 /// A pointer to the message that the caller now has ownership of.
\r
1003 virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
\r
1006 // Peek at the head message in the message buffer. If the Ids match
\r
1007 // dequeue and transfer ownership
\r
1009 message<_Type> * _Msg = NULL;
\r
1011 if (_M_messageBuffer.is_head(_MsgId))
\r
1013 _Msg = _M_messageBuffer.dequeue();
\r
1015 // Give preference to any previously postponed messages
\r
1016 // before decrementing current size.
\r
1017 if(!try_consume_msg())
\r
1019 _InterlockedDecrement(&_M_currentSize);
\r
1027 /// Try to reserve and consume a message from list of saved message ids.
\r
1030 /// True if a message was sucessfully consumed, false otherwise.
\r
1032 bool try_consume_msg()
\r
1034 runtime_object_identity _ReservedId = -1;
\r
1035 ISource<_Type> * _PSource = NULL;
\r
1037 // Walk through source links seeing if any saved ids exist.
\r
1038 bool _ConsumedMsg = true;
\r
1039 while(_ConsumedMsg)
\r
1041 source_iterator _Iter = _M_connectedSources.begin();
\r
1043 critical_section::scoped_lock scopedLock(_M_savedIdsLock);
\r
1044 for (; *_Iter != NULL; ++_Iter)
\r
1046 _PSource = *_Iter;
\r
1047 std::map<ISource<_Type> *, runtime_object_identity>::iterator _MapIter;
\r
1048 if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end())
\r
1050 _ReservedId = _MapIter->second;
\r
1051 _M_savedSourceMsgIds.erase(_MapIter);
\r
1057 // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster.
\r
1058 if(_ReservedId != -1)
\r
1060 if(_PSource->reserve(_ReservedId, this))
\r
1062 message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this);
\r
1063 async_send(_ConsumedMsg);
\r
1066 // Reserve failed go or link was removed,
\r
1067 // go back and try and find a different msg id.
\r
1074 // If this point is reached the map of source ids was empty.
\r
1082 /// Reserves a message previously offered by the source.
\r
1084 /// <param name="_MsgId">
\r
1085 /// The runtime object identity of the message.
\r
1088 /// A Boolean indicating whether the reservation worked or not.
\r
1091 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
1093 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
1095 // Allow reservation if this is the head message
\r
1096 return _M_messageBuffer.is_head(_MsgId);
\r
1100 /// Consumes a message that was reserved previously.
\r
1102 /// <param name="_MsgId">
\r
1103 /// The runtime object identity of the message.
\r
1106 /// A pointer to the message that the caller now has ownership of.
\r
1109 /// Similar to 'accept', but is always preceded by a call to 'reserve'.
\r
1111 virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
\r
1113 // By default, accept the message
\r
1114 return accept_message(_MsgId);
\r
1118 /// Releases a previous message reservation.
\r
1120 /// <param name="_MsgId">
\r
1121 /// The runtime object identity of the message.
\r
1123 virtual void release_message(runtime_object_identity _MsgId)
\r
1125 // The head message is the one reserved.
\r
1126 if (!_M_messageBuffer.is_head(_MsgId))
\r
1128 throw message_not_found();
\r
1133 /// Resumes propagation after a reservation has been released
\r
1135 virtual void resume_propagation()
\r
1137 // If there are any messages in the buffer, propagate them out
\r
1138 if (_M_messageBuffer.count() > 0)
\r
1145 /// Notification that a target was linked to this source.
\r
1147 /// <param name="_PTarget">
\r
1148 /// A pointer to the newly linked target.
\r
1150 virtual void link_target_notification(ITarget<_Type> * _PTarget)
\r
1152 // If the message queue is blocked due to reservation
\r
1153 // there is no need to do any message propagation
\r
1154 if (_M_pReservedFor != NULL)
\r
1159 message<_Type> * _Msg = _M_messageBuffer.peek();
\r
1163 // Propagate the head message to the new target
\r
1164 message_status _Status = _PTarget->propagate(_Msg, this);
\r
1166 if (_Status == accepted)
\r
1168 // The target accepted the message, restart propagation.
\r
1169 propagate_to_any_targets(NULL);
\r
1172 // If the status is anything other than accepted, then leave
\r
1173 // the message queue blocked.
\r
1178 /// Takes the message and propagates it to all the targets of this bounded_buffer.
\r
1179 /// This is called from async_send.
\r
1181 /// <param name="_PMessage">
\r
1182 /// A pointer to a new message.
\r
1184 virtual void propagate_to_any_targets(message<_Type> * _PMessage)
\r
1186 // Enqueue pMessage to the internal message buffer if it is non-NULL.
\r
1187 // pMessage can be NULL if this LWT was the result of a Repropagate call
\r
1188 // out of a Consume or Release (where no new message is queued up, but
\r
1189 // everything remaining in the bounded buffer needs to be propagated out)
\r
1190 if (_PMessage != NULL)
\r
1192 _M_messageBuffer.enqueue(_PMessage);
\r
1194 // If the incoming pMessage is not the head message, we can safely assume that
\r
1195 // the head message is blocked and waiting on Consume(), Release() or a new
\r
1196 // link_target() and cannot be propagated out.
\r
1197 if (_M_messageBuffer.is_head(_PMessage->msg_id()))
\r
1199 _Propagate_priority_order();
\r
1204 // While current size is less than capacity try to consume
\r
1205 // any previously offered ids.
\r
1206 bool _ConsumedMsg = true;
\r
1207 while(_ConsumedMsg)
\r
1209 // Assume a message will be found to successfully consume in the
\r
1210 // saved ids, if not this will be decremented afterwards.
\r
1211 if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)
\r
1216 _ConsumedMsg = try_consume_msg();
\r
1219 // Decrement the current size, we broke out of the previous loop
\r
1220 // because we reached capacity or there were no more messages to consume.
\r
1221 _InterlockedDecrement(&_M_currentSize);
\r
1228 /// Attempts to propagate out any messages currently in the block.
\r
1230 void _Propagate_priority_order()
\r
1232 message<_Target_type> * _Msg = _M_messageBuffer.peek();
\r
1234 // If someone has reserved the _Head message, don't propagate anymore
\r
1235 if (_M_pReservedFor != NULL)
\r
1240 while (_Msg != NULL)
\r
1242 message_status _Status = declined;
\r
1244 // Always start from the first target that linked.
\r
1245 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
1247 ITarget<_Target_type> * _PTarget = *_Iter;
\r
1248 _Status = _PTarget->propagate(_Msg, this);
\r
1250 // Ownership of message changed. Do not propagate this
\r
1251 // message to any other target.
\r
1252 if (_Status == accepted)
\r
1257 // If the target just propagated to reserved this message, stop
\r
1258 // propagating it to others.
\r
1259 if (_M_pReservedFor != NULL)
\r
1265 // If status is anything other than accepted, then the head message
\r
1266 // was not propagated out. Thus, nothing after it in the queue can
\r
1267 // be propagated out. Cease propagation.
\r
1268 if (_Status != accepted)
\r
1273 // Get the next message
\r
1274 _Msg = _M_messageBuffer.peek();
\r
1279 /// Message buffer used to store messages.
\r
1281 MessageQueue<_Type> _M_messageBuffer;
\r
1284 /// Maximum number of messages bounded_buffer can hold.
\r
1286 const size_t _M_capacity;
\r
1289 /// Current number of messages in bounded_buffer.
\r
1291 volatile long _M_currentSize;
\r
1294 /// Lock used to guard saved message ids map.
\r
1296 critical_section _M_savedIdsLock;
\r
1299 /// Map of source links to saved message ids.
\r
1301 std::map<ISource<_Type> *, runtime_object_identity> _M_savedSourceMsgIds;
\r
1304 // Hide assignment operator and copy constructor
\r
1306 bounded_buffer const &operator =(bounded_buffer const&); // no assignment operator
\r
1307 bounded_buffer(bounded_buffer const &); // no copy constructor
\r
1311 /// A simple alternator, offers messages in order to each target
\r
1312 /// one at a time. If a consume occurs a message won't be offered to that target again
\r
1313 /// until all others are given a chance. This causes messages to be distributed more
\r
1314 /// evenly among targets.
\r
1316 /// <typeparam name="_Type">
\r
1317 /// The payload type of messages stored and propagated by the buffer.
\r
1319 template<class _Type>
\r
1320 class alternator : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
\r
1324 /// Create an alternator within the default scheduler, and places it any schedule
\r
1325 /// group of the scheduler
\92s choosing.
\r
1328 : _M_indexNextTarget(0)
\r
1330 initialize_source_and_target();
\r
1334 /// Creates an alternator within the default scheduler, and places it any schedule
\r
1335 /// group of the scheduler
\92s choosing.
\r
1337 /// <param name="_Filter">
\r
1338 /// A reference to a filter function.
\r
1340 alternator(filter_method const& _Filter)
\r
1341 : _M_indexNextTarget(0)
\r
1343 initialize_source_and_target();
\r
1344 register_filter(_Filter);
\r
1348 /// Creates an alternator within the specified scheduler, and places it any schedule
\r
1349 /// group of the scheduler
\92s choosing.
\r
1351 /// <param name="_PScheduler">
\r
1352 /// A reference to a scheduler instance.
\r
1354 alternator(Scheduler& _PScheduler)
\r
1355 : _M_indexNextTarget(0)
\r
1357 initialize_source_and_target(&_PScheduler);
\r
1361 /// Creates an alternator within the specified scheduler, and places it any schedule
\r
1362 /// group of the scheduler
\92s choosing.
\r
1364 /// <param name="_PScheduler">
\r
1365 /// A reference to a scheduler instance.
\r
1367 /// <param name="_Filter">
\r
1368 /// A reference to a filter function.
\r
1370 alternator(Scheduler& _PScheduler, filter_method const& _Filter)
\r
1371 : _M_indexNextTarget(0)
\r
1373 initialize_source_and_target(&_PScheduler);
\r
1374 register_filter(_Filter);
\r
1378 /// Creates an alternator within the specified schedule group. The scheduler is implied
\r
1379 /// by the schedule group.
\r
1381 /// <param name="_PScheduleGroup">
\r
1382 /// A reference to a schedule group.
\r
1384 alternator(ScheduleGroup& _PScheduleGroup)
\r
1385 : _M_indexNextTarget(0)
\r
1387 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
1391 /// Creates an alternator within the specified schedule group. The scheduler is implied
\r
1392 /// by the schedule group.
\r
1394 /// <param name="_PScheduleGroup">
\r
1395 /// A reference to a schedule group.
\r
1397 /// <param name="_Filter">
\r
1398 /// A reference to a filter function.
\r
1400 alternator(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)
\r
1401 : _M_indexNextTarget(0)
\r
1403 initialize_source_and_target(NULL, &_PScheduleGroup);
\r
1404 register_filter(_Filter);
\r
1408 /// Cleans up any resources that may have been created by the alternator.
\r
1412 // Remove all links
\r
1413 remove_network_links();
\r
1419 /// The main propagate() function for ITarget blocks. Called by a source
\r
1420 /// block, generally within an asynchronous task to send messages to its targets.
\r
1422 /// <param name="_PMessage">
\r
1423 /// A pointer to the message
\r
1425 /// <param name="_PSource">
\r
1426 /// A pointer to the source block offering the message.
\r
1429 /// An indication of what the target decided to do with the message.
\r
1432 /// It is important that calls to propagate do *not* take the same lock on the
\r
1433 /// internal structure that is used by Consume and the LWT. Doing so could
\r
1434 /// result in a deadlock with the Consume call. (in the case of the alternator,
\r
1435 /// this lock is the m_internalLock)
\r
1437 virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
1439 message_status _Result = accepted;
\r
1441 // Accept the message being propagated
\r
1442 // Note: depending on the source block propagating the message
\r
1443 // this may not necessarily be the same message (pMessage) first
\r
1444 // passed into the function.
\r
1446 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
1448 if (_PMessage != NULL)
\r
1450 async_send(_PMessage);
\r
1461 /// Synchronously sends a message to this block. When this function completes the message will
\r
1462 /// already have propagated into the block.
\r
1464 /// <param name="_PMessage">
\r
1465 /// A pointer to the message.
\r
1467 /// <param name="_PSource">
\r
1468 /// A pointer to the source block offering the message.
\r
1471 /// An indication of what the target decided to do with the message.
\r
1473 virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)
\r
1475 _PMessage = _PSource->accept(_PMessage->msg_id(), this);
\r
1477 if (_PMessage != NULL)
\r
1479 sync_send(_PMessage);
\r
1490 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
1492 /// <param name="_MsgId">
\r
1493 /// The runtime object identity of the message.
\r
1496 /// A pointer to the message that the caller now has ownership of.
\r
1498 virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
\r
1501 // Peek at the head message in the message buffer. If the Ids match
\r
1502 // dequeue and transfer ownership
\r
1504 message<_Type> * _Msg = NULL;
\r
1506 if (_M_messageBuffer.is_head(_MsgId))
\r
1508 _Msg = _M_messageBuffer.dequeue();
\r
1515 /// Reserves a message previously offered by the source.
\r
1517 /// <param name="_MsgId">
\r
1518 /// The runtime object identity of the message.
\r
1521 /// A Boolean indicating whether the reservation worked or not.
\r
1524 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
1526 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
1528 // Allow reservation if this is the head message
\r
1529 return _M_messageBuffer.is_head(_MsgId);
\r
1533 /// Consumes a message that was reserved previously.
\r
1535 /// <param name="_MsgId">
\r
1536 /// The runtime object identity of the message.
\r
1539 /// A pointer to the message that the caller now has ownership of.
\r
1542 /// Similar to 'accept', but is always preceded by a call to 'reserve'.
\r
1544 virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
\r
1546 // Update so we don't offer to this target again until
\r
1547 // all others have a chance.
\r
1548 target_iterator _CurrentIter = _M_connectedTargets.begin();
\r
1549 for(size_t i = 0;*_CurrentIter != NULL; ++_CurrentIter, ++i)
\r
1551 if(*_CurrentIter == _M_pReservedFor)
\r
1553 _M_indexNextTarget = i + 1;
\r
1558 // By default, accept the message
\r
1559 return accept_message(_MsgId);
\r
1563 /// Releases a previous message reservation.
\r
1565 /// <param name="_MsgId">
\r
1566 /// The runtime object identity of the message.
\r
1568 virtual void release_message(runtime_object_identity _MsgId)
\r
1570 // The head message is the one reserved.
\r
1571 if (!_M_messageBuffer.is_head(_MsgId))
\r
1573 throw message_not_found();
\r
1578 /// Resumes propagation after a reservation has been released.
\r
1580 virtual void resume_propagation()
\r
1582 // If there are any messages in the buffer, propagate them out
\r
1583 if (_M_messageBuffer.count() > 0)
\r
1590 /// Notification that a target was linked to this source.
\r
1592 /// <param name="_PTarget">
\r
1593 /// A pointer to the newly linked target.
\r
1595 virtual void link_target_notification(ITarget<_Type> * _PTarget)
\r
1597 // If the message queue is blocked due to reservation
\r
1598 // there is no need to do any message propagation
\r
1599 if (_M_pReservedFor != NULL)
\r
1604 message<_Type> * _Msg = _M_messageBuffer.peek();
\r
1608 // Propagate the head message to the new target
\r
1609 message_status _Status = _PTarget->propagate(_Msg, this);
\r
1611 if (_Status == accepted)
\r
1613 // The target accepted the message, restart propagation.
\r
1614 propagate_to_any_targets(NULL);
\r
1617 // If the status is anything other than accepted, then leave
\r
1618 // the message queue blocked.
\r
1623 /// Takes the message and propagates it to all the targets of this alternator.
\r
1624 /// This is called from async_send.
\r
1626 /// <param name="_PMessage">
\r
1627 /// A pointer to a new message.
\r
1629 virtual void propagate_to_any_targets(message<_Type> * _PMessage)
\r
1631 // Enqueue pMessage to the internal buffer queue if it is non-NULL.
\r
1632 // pMessage can be NULL if this LWT was the result of a Repropagate call
\r
1633 // out of a Consume or Release (where no new message is queued up, but
\r
1634 // everything remaining in the unbounded buffer needs to be propagated out)
\r
1635 if (_PMessage != NULL)
\r
1637 _M_messageBuffer.enqueue(_PMessage);
\r
1639 // If the incoming pMessage is not the head message, we can safely assume that
\r
1640 // the head message is blocked and waiting on Consume(), Release() or a new
\r
1642 if (!_M_messageBuffer.is_head(_PMessage->msg_id()))
\r
1648 // Attempt to propagate messages to targets in order last left off.
\r
1649 _Propagate_alternating_order();
\r
1653 /// Offers messages to targets in alternating order to help distribute messages
\r
1654 /// evenly among targets.
\r
1656 void _Propagate_alternating_order()
\r
1658 message<_Target_type> * _Msg = _M_messageBuffer.peek();
\r
1660 // If someone has reserved the _Head message, don't propagate anymore
\r
1661 if (_M_pReservedFor != NULL)
\r
1667 // Try to start where left off before, if the link has been removed
\r
1668 // or this is the first time then start at the beginning.
\r
1670 target_iterator _CurrentIter = _M_connectedTargets.begin();
\r
1671 const target_iterator _FirstLinkIter(_CurrentIter);
\r
1672 for(size_t i = 0;*_CurrentIter != NULL && i < _M_indexNextTarget; ++_CurrentIter, ++i) {}
\r
1674 while (_Msg != NULL)
\r
1676 message_status _Status = declined;
\r
1678 // Loop offering message until end of links is reached.
\r
1679 target_iterator _StartedIter(_CurrentIter);
\r
1680 for(;*_CurrentIter != NULL; ++_CurrentIter)
\r
1682 _Status = (*_CurrentIter)->propagate(_Msg, this);
\r
1683 ++_M_indexNextTarget;
\r
1685 // Ownership of message changed. Do not propagate this
\r
1686 // message to any other target.
\r
1687 if (_Status == accepted)
\r
1693 // If the target just propagated to reserved this message, stop
\r
1694 // propagating it to others
\r
1695 if (_M_pReservedFor != NULL)
\r
1701 // Message ownership changed go to next messages.
\r
1702 if (_Status == accepted)
\r
1707 // Try starting from the beginning until the first link offering was started at.
\r
1708 _M_indexNextTarget = 0;
\r
1709 for(_CurrentIter = _FirstLinkIter;*_CurrentIter != NULL; ++_CurrentIter)
\r
1711 // I have offered the same message to all links now so stop.
\r
1712 if(*_CurrentIter == *_StartedIter)
\r
1717 _Status = (*_CurrentIter)->propagate(_Msg, this);
\r
1718 ++_M_indexNextTarget;
\r
1720 // Ownership of message changed. Do not propagate this
\r
1721 // message to any other target.
\r
1722 if (_Status == accepted)
\r
1728 // If the target just propagated to reserved this message, stop
\r
1729 // propagating it to others
\r
1730 if (_M_pReservedFor != NULL)
\r
1736 // If status is anything other than accepted, then the head message
\r
1737 // was not propagated out. Thus, nothing after it in the queue can
\r
1738 // be propagated out. Cease propagation.
\r
1739 if (_Status != accepted)
\r
1744 // Get the next message
\r
1745 _Msg = _M_messageBuffer.peek();
\r
1752 /// Message queue used to store messages.
\r
1754 MessageQueue<_Type> _M_messageBuffer;
\r
1757 /// Index of next target to call propagate on. Used to alternate and load
\r
1758 /// balance message offering.
\r
1760 size_t _M_indexNextTarget;
\r
1763 // Hide assignment operator and copy constructor.
\r
1765 alternator const &operator =(alternator const&); // no assignment operator
\r
1766 alternator(alternator const &); // no copy constructor
\r
1769 #include <agents.h>
\r
1772 // Sample block that combines join and transform.
\r
1774 template<class _Input, class _Output, join_type _Jtype = non_greedy>
\r
1775 class join_transform : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>
\r
1779 typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;
\r
1782 /// Create an join block within the default scheduler, and places it any schedule
\r
1783 /// group of the scheduler
\92s choosing.
\r
1785 /// <param name="_NumInputs">
\r
1786 /// The number of inputs this join will be allowed
\r
1788 join_transform(size_t _NumInputs, _Transform_method const& _Func)
\r
1789 : _M_messageArray(_NumInputs, 0),
\r
1790 _M_savedMessageIdArray(_NumInputs, -1),
\r
1793 _Initialize(_NumInputs);
\r
1797 /// Create an join block within the default scheduler, and places it any schedule
\r
1798 /// group of the scheduler
\92s choosing.
\r
1800 /// <param name="_NumInputs">
\r
1801 /// The number of inputs this join will be allowed
\r
1803 /// <param name="_Filter">
\r
1804 /// A filter method placed on this join
\r
1806 join_transform(size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)
\r
1807 : _M_messageArray(_NumInputs, 0),
\r
1808 _M_savedMessageIdArray(_NumInputs, -1),
\r
1811 _Initialize(_NumInputs);
\r
1812 register_filter(_Filter);
\r
1816 /// Create an join block within the specified scheduler, and places it any schedule
\r
1817 /// group of the scheduler
\92s choosing.
\r
1819 /// <param name="_Scheduler">
\r
1820 /// The scheduler onto which the task's message propagation will be scheduled.
\r
1822 /// <param name="_NumInputs">
\r
1823 /// The number of inputs this join will be allowed
\r
1825 join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func)
\r
1826 : _M_messageArray(_NumInputs, 0),
\r
1827 _M_savedMessageIdArray(_NumInputs, -1),
\r
1830 _Initialize(_NumInputs, &_PScheduler);
\r
1834 /// Create an join block within the specified scheduler, and places it any schedule
\r
1835 /// group of the scheduler
\92s choosing.
\r
1837 /// <param name="_Scheduler">
\r
1838 /// The scheduler onto which the task's message propagation will be scheduled.
\r
1840 /// <param name="_NumInputs">
\r
1841 /// The number of inputs this join will be allowed
\r
1843 /// <param name="_Filter">
\r
1844 /// A filter method placed on this join
\r
1846 join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)
\r
1847 : _M_messageArray(_NumInputs, 0),
\r
1848 _M_savedMessageIdArray(_NumInputs, -1),
\r
1851 _Initialize(_NumInputs, &_PScheduler);
\r
1852 register_filter(_Filter);
\r
1856 /// Create an join block within the specified schedule group. The scheduler is implied
\r
1857 /// by the schedule group.
\r
1859 /// <param name="_PScheduleGroup">
\r
1860 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
1862 /// <param name="_NumInputs">
\r
1863 /// The number of inputs this join will be allowed
\r
1865 join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func)
\r
1866 : _M_messageArray(_NumInputs, 0),
\r
1867 _M_savedMessageIdArray(_NumInputs, -1),
\r
1870 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
1874 /// Create an join block within the specified schedule group. The scheduler is implied
\r
1875 /// by the schedule group.
\r
1877 /// <param name="_PScheduleGroup">
\r
1878 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
1880 /// <param name="_NumInputs">
\r
1881 /// The number of inputs this join will be allowed
\r
1883 /// <param name="_Filter">
\r
1884 /// A filter method placed on this join
\r
1886 join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)
\r
1887 : _M_messageArray(_NumInputs, 0),
\r
1888 _M_savedMessageIdArray(_NumInputs, -1),
\r
1891 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
1892 register_filter(_Filter);
\r
1896 /// Destroys a join
\r
1900 // Remove all links that are targets of this join
\r
1901 remove_network_links();
\r
1903 delete [] _M_savedIdBuffer;
\r
1908 // propagator_block protected function implementations
\r
1912 /// The main propagate() function for ITarget blocks. Called by a source
\r
1913 /// block, generally within an asynchronous task to send messages to its targets.
\r
1915 /// <param name="_PMessage">
\r
1916 /// The message being propagated
\r
1918 /// <param name="_PSource">
\r
1919 /// The source doing the propagation
\r
1922 /// An indication of what the target decided to do with the message.
\r
1925 /// It is important that calls to propagate do *not* take the same lock on the
\r
1926 /// internal structure that is used by Consume and the LWT. Doing so could
\r
1927 /// result in a deadlock with the Consume call.
\r
1929 message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource)
\r
1931 message_status _Ret_val = accepted;
\r
1934 // Find the slot index of this source
\r
1937 bool _Found = false;
\r
1938 for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
\r
1940 if (*_Iter == _PSource)
\r
1951 // If this source was not found in the array, this is not a connected source
\r
1952 // decline the message
\r
1956 _ASSERTE(_Slot < _M_messageArray.size());
\r
1958 bool fIsGreedy = (_Jtype == greedy);
\r
1963 // Greedy type joins immediately accept the message.
\r
1966 critical_section::scoped_lock lockHolder(_M_propagationLock);
\r
1967 if (_M_messageArray[_Slot] != NULL)
\r
1969 _M_savedMessageIdArray[_Slot] = _PMessage->msg_id();
\r
1970 _Ret_val = postponed;
\r
1974 if (_Ret_val != postponed)
\r
1976 _M_messageArray[_Slot] = _PSource->accept(_PMessage->msg_id(), this);
\r
1978 if (_M_messageArray[_Slot] != NULL)
\r
1980 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)
\r
1982 // If messages have arrived on all links, start a propagation
\r
1983 // of the current message
\r
1989 _Ret_val = missed;
\r
1996 // Non-greedy type joins save the message ids until they have all arrived
\r
1999 if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)
\r
2001 // Decrement the message remaining count if this thread is switching
\r
2002 // the saved id from -1 to a valid value.
\r
2003 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)
\r
2009 // Always return postponed. This message will be consumed
\r
2011 _Ret_val = postponed;
\r
2018 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
2020 /// <param name="_MsgId">
\r
2021 /// The runtime object identity of the message.
\r
2024 /// A pointer to the message that the caller now has ownership of.
\r
2026 virtual message<_Output> * accept_message(runtime_object_identity _MsgId)
\r
2029 // Peek at the head message in the message buffer. If the Ids match
\r
2030 // dequeue and transfer ownership
\r
2032 message<_Output> * _Msg = NULL;
\r
2034 if (_M_messageBuffer.is_head(_MsgId))
\r
2036 _Msg = _M_messageBuffer.dequeue();
\r
2043 /// Reserves a message previously offered by the source.
\r
2045 /// <param name="_MsgId">
\r
2046 /// The runtime object identity of the message.
\r
2049 /// A bool indicating whether the reservation worked or not
\r
2052 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
2054 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
2056 // Allow reservation if this is the head message
\r
2057 return _M_messageBuffer.is_head(_MsgId);
\r
2061 /// Consumes a message previously offered by the source and reserved by the target,
\r
2062 /// transferring ownership to the caller.
\r
2064 /// <param name="_MsgId">
\r
2065 /// The runtime object identity of the message.
\r
2068 /// A pointer to the message that the caller now has ownership of.
\r
2071 /// Similar to 'accept', but is always preceded by a call to 'reserve'
\r
2073 virtual message<_Output> * consume_message(runtime_object_identity _MsgId)
\r
2075 // By default, accept the message
\r
2076 return accept_message(_MsgId);
\r
2080 /// Releases a previous message reservation.
\r
2082 /// <param name="_MsgId">
\r
2083 /// The runtime object identity of the message.
\r
2085 virtual void release_message(runtime_object_identity _MsgId)
\r
2087 // The head message is the one reserved.
\r
2088 if (!_M_messageBuffer.is_head(_MsgId))
\r
2090 throw message_not_found();
\r
2095 /// Resumes propagation after a reservation has been released
\r
2097 virtual void resume_propagation()
\r
2099 // If there are any messages in the buffer, propagate them out
\r
2100 if (_M_messageBuffer.count() > 0)
\r
2107 /// Notification that a target was linked to this source.
\r
2109 /// <param name="_PTarget">
\r
2110 /// A pointer to the newly linked target.
\r
2112 virtual void link_target_notification(ITarget<_Output> *)
\r
2114 // If the message queue is blocked due to reservation
\r
2115 // there is no need to do any message propagation
\r
2116 if (_M_pReservedFor != NULL)
\r
2121 _Propagate_priority_order(_M_messageBuffer);
\r
2125 /// Takes the message and propagates it to all the target of this join.
\r
2126 /// This is called from async_send.
\r
2128 /// <param name="_PMessage">
\r
2129 /// The message being propagated
\r
2131 void propagate_to_any_targets(message<_Output> *)
\r
2133 message<_Output> * _Msg = NULL;
\r
2134 // Create a new message from the input sources
\r
2135 // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from
\r
2136 // a consume or release from the target. In that case we don't want to create a new message.
\r
2137 if (_M_messagesRemaining == 0)
\r
2139 // A greedy join can immediately create the message, a non-greedy
\r
2140 // join must try and consume all the messages it has postponed
\r
2141 _Msg = _Create_new_message();
\r
2146 // Create message failed. This happens in non_greedy joins when the
\r
2147 // reserve/consumption of a postponed message failed.
\r
2148 _Propagate_priority_order(_M_messageBuffer);
\r
2152 bool fIsGreedy = (_Jtype == greedy);
\r
2154 // For a greedy join, reset the number of messages remaining
\r
2155 // Check to see if multiple messages have been passed in on any of the links,
\r
2156 // and postponed. If so, try and reserve/consume them now
\r
2159 // Look at the saved ids and reserve/consume any that have passed in while
\r
2160 // this join was waiting to complete
\r
2161 _ASSERTE(_M_messageArray.size() == _M_savedMessageIdArray.size());
\r
2163 for (size_t i = 0; i < _M_messageArray.size(); i++)
\r
2167 runtime_object_identity _Saved_id;
\r
2168 // Grab the current saved id value. This value could be changing from based on any
\r
2169 // calls of source->propagate(this). If the message id is different than what is snapped
\r
2170 // here, that means, the reserve below must fail. This is because reserve is trying
\r
2171 // to get the same source lock the propagate(this) call must be holding.
\r
2173 critical_section::scoped_lock lockHolder(_M_propagationLock);
\r
2175 _ASSERTE(_M_messageArray[i] != NULL);
\r
2177 _Saved_id = _M_savedMessageIdArray[i];
\r
2179 if (_Saved_id == -1)
\r
2181 _M_messageArray[i] = NULL;
\r
2186 _M_savedMessageIdArray[i] = -1;
\r
2190 if (_Saved_id != -1)
\r
2192 source_iterator _Iter = _M_connectedSources.begin();
\r
2194 ISource<_Input> * _PSource = _Iter[i];
\r
2195 if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))
\r
2197 _M_messageArray[i] = _PSource->consume(_Saved_id, this);
\r
2198 _InterlockedDecrementSizeT(&_M_messagesRemaining);
\r
2205 // If messages have all been received, async_send again, this will start the
\r
2206 // LWT up to create a new message
\r
2207 if (_M_messagesRemaining == 0)
\r
2213 // Add the new message to the outbound queue
\r
2214 _M_messageBuffer.enqueue(_Msg);
\r
2216 if (!_M_messageBuffer.is_head(_Msg->msg_id()))
\r
2218 // another message is at the head of the outbound message queue and blocked
\r
2223 _Propagate_priority_order(_M_messageBuffer);
\r
2229 // Private Methods
\r
2233 /// Propagate messages in priority order
\r
2235 /// <param name="_MessageBuffer">
\r
2236 /// Reference to a message queue with messages to be propagated
\r
2238 void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)
\r
2240 message<_Output> * _Msg = _MessageBuffer.peek();
\r
2242 // If someone has reserved the _Head message, don't propagate anymore
\r
2243 if (_M_pReservedFor != NULL)
\r
2248 while (_Msg != NULL)
\r
2250 message_status _Status = declined;
\r
2252 // Always start from the first target that linked
\r
2253 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
2255 ITarget<_Output> * _PTarget = *_Iter;
\r
2256 _Status = _PTarget->propagate(_Msg, this);
\r
2258 // Ownership of message changed. Do not propagate this
\r
2259 // message to any other target.
\r
2260 if (_Status == accepted)
\r
2265 // If the target just propagated to reserved this message, stop
\r
2266 // propagating it to others
\r
2267 if (_M_pReservedFor != NULL)
\r
2273 // If status is anything other than accepted, then the head message
\r
2274 // was not propagated out. Thus, nothing after it in the queue can
\r
2275 // be propagated out. Cease propagation.
\r
2276 if (_Status != accepted)
\r
2281 // Get the next message
\r
2282 _Msg = _MessageBuffer.peek();
\r
2287 /// Create a new message from the data output
\r
2290 /// The created message (NULL if creation failed)
\r
2292 message<_Output> * __cdecl _Create_new_message()
\r
2294 bool fIsNonGreedy = (_Jtype == non_greedy);
\r
2296 // If this is a non-greedy join, check each source and try to consume their message
\r
2300 // The iterator _Iter below will ensure that it is safe to touch
\r
2301 // non-NULL source pointers. Take a snapshot.
\r
2302 std::vector<ISource<_Input> *> _Sources;
\r
2303 source_iterator _Iter = _M_connectedSources.begin();
\r
2305 while (*_Iter != NULL)
\r
2307 ISource<_Input> * _PSource = *_Iter;
\r
2309 if (_PSource == NULL)
\r
2314 _Sources.push_back(_PSource);
\r
2318 if (_Sources.size() != _M_messageArray.size())
\r
2320 // Some of the sources were unlinked. The join is broken
\r
2324 // First, try and reserve all the messages. If a reservation fails,
\r
2325 // then release any reservations that had been made.
\r
2326 for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)
\r
2328 // Snap the current saved id into a buffer. This value can be changing behind the scenes from
\r
2329 // other source->propagate(msg, this) calls, but if so, that just means the reserve below will
\r
2331 _InterlockedIncrementSizeT(&_M_messagesRemaining);
\r
2332 _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);
\r
2334 _ASSERTE(_M_savedIdBuffer[i] != -1);
\r
2336 if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))
\r
2338 // A reservation failed, release all reservations made up until
\r
2339 // this block, and wait for another message to arrive on this link
\r
2340 for (size_t j = 0; j < i; j++)
\r
2342 _Sources[j]->release(_M_savedIdBuffer[j], this);
\r
2343 if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1) == -1)
\r
2345 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)
\r
2352 // Return NULL to indicate that the create failed
\r
2357 // Since everything has been reserved, consume all the messages.
\r
2358 // This is guaranteed to return true.
\r
2359 for (size_t i = 0; i < _M_messageArray.size(); i++)
\r
2361 _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);
\r
2362 _M_savedIdBuffer[i] = -1;
\r
2366 if (!fIsNonGreedy)
\r
2368 // Reinitialize how many messages are being waited for.
\r
2369 // This is safe because all messages have been received, thus no new async_sends for
\r
2370 // greedy joins can be called.
\r
2371 _M_messagesRemaining = _M_messageArray.size();
\r
2374 std::vector<_Input> _OutputVector;
\r
2375 for (size_t i = 0; i < _M_messageArray.size(); i++)
\r
2377 _ASSERTE(_M_messageArray[i] != NULL);
\r
2378 _OutputVector.push_back(_M_messageArray[i]->payload);
\r
2380 delete _M_messageArray[i];
\r
2383 _M_messageArray[i] = NULL;
\r
2387 _Output _Out = _M_pFunc(_OutputVector);
\r
2389 return (new message<_Output>(_Out));
\r
2393 /// Initialize the join block
\r
2395 /// <param name="_NumInputs">
\r
2396 /// The number of inputs
\r
2398 void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
\r
2400 initialize_source_and_target(_PScheduler, _PScheduleGroup);
\r
2402 _M_connectedSources.set_bound(_NumInputs);
\r
2403 _M_messagesRemaining = _NumInputs;
\r
2405 bool fIsNonGreedy = (_Jtype == non_greedy);
\r
2409 // Non greedy joins need a buffer to snap off saved message ids to.
\r
2410 _M_savedIdBuffer = new runtime_object_identity[_NumInputs];
\r
2411 memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);
\r
2415 _M_savedIdBuffer = NULL;
\r
2419 // The current number of messages remaining
\r
2420 volatile size_t _M_messagesRemaining;
\r
2422 // An array containing the accepted messages of this join.
\r
2423 std::vector<message<_Input>*> _M_messageArray;
\r
2425 // An array containing the msg ids of messages propagated to the array
\r
2426 // For greedy joins, this contains a log of other messages passed to this
\r
2427 // join after the first has been accepted
\r
2428 // For non-greedy joins, this contains the message id of any message
\r
2430 std::vector<runtime_object_identity> _M_savedMessageIdArray;
\r
2432 // The transformer method called by this block
\r
2433 _Transform_method _M_pFunc;
\r
2435 // Buffer for snapping saved ids in non-greedy joins
\r
2436 runtime_object_identity * _M_savedIdBuffer;
\r
2438 // A lock for modifying the buffer or the connected blocks
\r
2439 ::Concurrency::critical_section _M_propagationLock;
\r
2441 // Queue to hold output messages
\r
2442 MessageQueue<_Output> _M_messageBuffer;
\r
2446 // Message block that invokes a transform method when it receives message on any of the input links.
\r
2447 // A typical example is recal engine for a cell in an Excel spreadsheet.
\r
2448 // (Remember that a normal join block is triggered only when it receives messages on all its input links).
\r
2450 template<class _Input, class _Output>
\r
2451 class recalculate : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>
\r
2454 typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Recalculate_method;
\r
2457 /// Create an recalculate block within the default scheduler, and places it any schedule
\r
2458 /// group of the scheduler
\92s choosing.
\r
2460 /// <param name="_NumInputs">
\r
2461 /// The number of inputs
\r
2463 recalculate(size_t _NumInputs, _Recalculate_method const& _Func)
\r
2464 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2467 _Initialize(_NumInputs);
\r
2471 /// Create an recalculate block within the default scheduler, and places it any schedule
\r
2472 /// group of the scheduler
\92s choosing.
\r
2474 /// <param name="_NumInputs">
\r
2475 /// The number of inputs
\r
2477 /// <param name="_Filter">
\r
2478 /// A filter method placed on this join
\r
2480 recalculate(size_t _NumInputs, _Recalculate_method const& _Func, filter_method const& _Filter)
\r
2481 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2484 _Initialize(_NumInputs);
\r
2485 register_filter(_Filter);
\r
2489 /// Create an recalculate block within the specified scheduler, and places it any schedule
\r
2490 /// group of the scheduler
\92s choosing.
\r
2492 /// <param name="_Scheduler">
\r
2493 /// The scheduler onto which the task's message propagation will be scheduled.
\r
2495 /// <param name="_NumInputs">
\r
2496 /// The number of inputs
\r
2498 recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func)
\r
2499 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2502 _Initialize(_NumInputs, &_PScheduler);
\r
2506 /// Create an recalculate block within the specified scheduler, and places it any schedule
\r
2507 /// group of the scheduler
\92s choosing.
\r
2509 /// <param name="_Scheduler">
\r
2510 /// The scheduler onto which the task's message propagation will be scheduled.
\r
2512 /// <param name="_NumInputs">
\r
2513 /// The number of inputs
\r
2515 /// <param name="_Filter">
\r
2516 /// A filter method placed on this join
\r
2518 recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func, filter_method const& _Filter)
\r
2519 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2522 _Initialize(_NumInputs, &_PScheduler);
\r
2523 register_filter(_Filter);
\r
2527 /// Create an recalculate block within the specified schedule group. The scheduler is implied
\r
2528 /// by the schedule group.
\r
2530 /// <param name="_PScheduleGroup">
\r
2531 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
2533 /// <param name="_NumInputs">
\r
2534 /// The number of inputs
\r
2536 recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func)
\r
2537 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2540 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
2544 /// Create an recalculate block within the specified schedule group. The scheduler is implied
\r
2545 /// by the schedule group.
\r
2547 /// <param name="_PScheduleGroup">
\r
2548 /// The ScheduleGroup onto which the task's message propagation will be scheduled.
\r
2550 /// <param name="_NumInputs">
\r
2551 /// The number of inputs
\r
2553 /// <param name="_Filter">
\r
2554 /// A filter method placed on this join
\r
2556 recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func, filter_method const& _Filter)
\r
2557 : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),
\r
2560 _Initialize(_NumInputs, NULL, &_PScheduleGroup);
\r
2561 register_filter(_Filter);
\r
2565 /// Destroys a join
\r
2569 // Remove all links that are targets of this join
\r
2570 remove_network_links();
\r
2572 delete [] _M_savedIdBuffer;
\r
2577 // propagator_block protected function implementations
\r
2581 /// The main propagate() function for ITarget blocks. Called by a source
\r
2582 /// block, generally within an asynchronous task to send messages to its targets.
\r
2584 /// <param name="_PMessage">
\r
2585 /// The message being propagated
\r
2587 /// <param name="_PSource">
\r
2588 /// The source doing the propagation
\r
2591 /// An indication of what the target decided to do with the message.
\r
2594 /// It is important that calls to propagate do *not* take the same lock on the
\r
2595 /// internal structure that is used by Consume and the LWT. Doing so could
\r
2596 /// result in a deadlock with the Consume call.
\r
2598 message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource)
\r
2601 // Find the slot index of this source
\r
2604 bool _Found = false;
\r
2605 for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
\r
2607 if (*_Iter == _PSource)
\r
2618 // If this source was not found in the array, this is not a connected source
\r
2619 // decline the message
\r
2624 // Save the message id
\r
2626 if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)
\r
2628 // If it is not seen by Create_message attempt a recalculate
\r
2632 // Always return postponed. This message will be consumed
\r
2638 /// Accepts an offered message by the source, transferring ownership to the caller.
\r
2640 /// <param name="_MsgId">
\r
2641 /// The runtime object identity of the message.
\r
2644 /// A pointer to the message that the caller now has ownership of.
\r
2646 virtual message<_Output> * accept_message(runtime_object_identity _MsgId)
\r
2649 // Peek at the head message in the message buffer. If the Ids match
\r
2650 // dequeue and transfer ownership
\r
2652 message<_Output> * _Msg = NULL;
\r
2654 if (_M_messageBuffer.is_head(_MsgId))
\r
2656 _Msg = _M_messageBuffer.dequeue();
\r
2663 /// Reserves a message previously offered by the source.
\r
2665 /// <param name="_MsgId">
\r
2666 /// The runtime object identity of the message.
\r
2669 /// A bool indicating whether the reservation worked or not
\r
2672 /// After 'reserve' is called, either 'consume' or 'release' must be called.
\r
2674 virtual bool reserve_message(runtime_object_identity _MsgId)
\r
2676 // Allow reservation if this is the head message
\r
2677 return _M_messageBuffer.is_head(_MsgId);
\r
2681 /// Consumes a message previously offered by the source and reserved by the target,
\r
2682 /// transferring ownership to the caller.
\r
2684 /// <param name="_MsgId">
\r
2685 /// The runtime object identity of the message.
\r
2688 /// A pointer to the message that the caller now has ownership of.
\r
2691 /// Similar to 'accept', but is always preceded by a call to 'reserve'
\r
2693 virtual message<_Output> * consume_message(runtime_object_identity _MsgId)
\r
2695 // By default, accept the message
\r
2696 return accept_message(_MsgId);
\r
2700 /// Releases a previous message reservation.
\r
2702 /// <param name="_MsgId">
\r
2703 /// The runtime object identity of the message.
\r
2705 virtual void release_message(runtime_object_identity _MsgId)
\r
2707 // The head message is the one reserved.
\r
2708 if (!_M_messageBuffer.is_head(_MsgId))
\r
2710 throw message_not_found();
\r
2715 /// Resumes propagation after a reservation has been released
\r
2717 virtual void resume_propagation()
\r
2719 // If there are any messages in the buffer, propagate them out
\r
2720 if (_M_messageBuffer.count() > 0)
\r
2727 /// Notification that a target was linked to this source.
\r
2729 /// <param name="_PTarget">
\r
2730 /// A pointer to the newly linked target.
\r
2732 virtual void link_target_notification(ITarget<_Output> *)
\r
2734 // If the message queue is blocked due to reservation
\r
2735 // there is no need to do any message propagation
\r
2736 if (_M_pReservedFor != NULL)
\r
2741 _Propagate_priority_order(_M_messageBuffer);
\r
2745 /// Takes the message and propagates it to all the target of this join.
\r
2746 /// This is called from async_send.
\r
2748 /// <param name="_PMessage">
\r
2749 /// The message being propagated
\r
2751 void propagate_to_any_targets(message<_Output> *)
\r
2753 // Attempt to create a new message
\r
2754 message<_Output> * _Msg = _Create_new_message();
\r
2758 // Add the new message to the outbound queue
\r
2759 _M_messageBuffer.enqueue(_Msg);
\r
2761 if (!_M_messageBuffer.is_head(_Msg->msg_id()))
\r
2763 // another message is at the head of the outbound message queue and blocked
\r
2769 _Propagate_priority_order(_M_messageBuffer);
\r
2775 // Private Methods
\r
2779 /// Propagate messages in priority order
\r
2781 /// <param name="_MessageBuffer">
\r
2782 /// Reference to a message queue with messages to be propagated
\r
2784 void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)
\r
2786 message<_Output> * _Msg = _MessageBuffer.peek();
\r
2788 // If someone has reserved the _Head message, don't propagate anymore
\r
2789 if (_M_pReservedFor != NULL)
\r
2794 while (_Msg != NULL)
\r
2796 message_status _Status = declined;
\r
2798 // Always start from the first target that linked
\r
2799 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
\r
2801 ITarget<_Output> * _PTarget = *_Iter;
\r
2802 _Status = _PTarget->propagate(_Msg, this);
\r
2804 // Ownership of message changed. Do not propagate this
\r
2805 // message to any other target.
\r
2806 if (_Status == accepted)
\r
2811 // If the target just propagated to reserved this message, stop
\r
2812 // propagating it to others
\r
2813 if (_M_pReservedFor != NULL)
\r
2819 // If status is anything other than accepted, then the head message
\r
2820 // was not propagated out. Thus, nothing after it in the queue can
\r
2821 // be propagated out. Cease propagation.
\r
2822 if (_Status != accepted)
\r
2827 // Get the next message
\r
2828 _Msg = _MessageBuffer.peek();
\r
2833 /// Create a new message from the data output
\r
2836 /// The created message (NULL if creation failed)
\r
2838 message<_Output> * __cdecl _Create_new_message()
\r
2840 // If this is a non-greedy join, check each source and try to consume their message
\r
2841 size_t _NumInputs = _M_savedMessageIdArray.size();
\r
2843 // The iterator _Iter below will ensure that it is safe to touch
\r
2844 // non-NULL source pointers. Take a snapshot.
\r
2845 std::vector<ISource<_Input> *> _Sources;
\r
2846 source_iterator _Iter = _M_connectedSources.begin();
\r
2848 while (*_Iter != NULL)
\r
2850 ISource<_Input> * _PSource = *_Iter;
\r
2852 if (_PSource == NULL)
\r
2857 _Sources.push_back(_PSource);
\r
2861 if (_Sources.size() != _NumInputs)
\r
2863 // Some of the sources were unlinked. The join is broken
\r
2867 // First, try and reserve all the messages. If a reservation fails,
\r
2868 // then release any reservations that had been made.
\r
2869 for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)
\r
2871 // Swap the id to -1 indicating that we have used that value for a recalculate
\r
2872 _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);
\r
2874 // If the id is -1, either we have never received a message on that link or the previous message is stored
\r
2875 // in the message array. If it is the former we abort.
\r
2876 // If the id is not -1, we attempt to reserve the message. On failure we abort.
\r
2877 if (((_M_savedIdBuffer[i] == -1) && (_M_messageArray[i] == NULL))
\r
2878 || ((_M_savedIdBuffer[i] != -1) && !_Sources[i]->reserve(_M_savedIdBuffer[i], this)))
\r
2880 // Abort. Release all reservations made up until this block,
\r
2881 // and wait for another message to arrive.
\r
2882 for (size_t j = 0; j < i; j++)
\r
2884 if (_M_savedIdBuffer[j] != -1)
\r
2886 _Sources[j]->release(_M_savedIdBuffer[j], this);
\r
2887 _InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1);
\r
2891 // Return NULL to indicate that the create failed
\r
2896 // Since everything has been reserved, consume all the messages.
\r
2897 // This is guaranteed to return true.
\r
2898 size_t _NewMessages = 0;
\r
2899 for (size_t i = 0; i < _NumInputs; i++)
\r
2901 if (_M_savedIdBuffer[i] != -1)
\r
2903 // Delete previous message since we have a new one
\r
2904 if (_M_messageArray[i] != NULL)
\r
2906 delete _M_messageArray[i];
\r
2908 _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);
\r
2909 _M_savedIdBuffer[i] = -1;
\r
2914 if (_NewMessages == 0)
\r
2916 // There is no need to recal if we did not consume a new message
\r
2920 std::vector<_Input> _OutputVector;
\r
2921 for (size_t i = 0; i < _NumInputs; i++)
\r
2923 _ASSERTE(_M_messageArray[i] != NULL);
\r
2924 _OutputVector.push_back(_M_messageArray[i]->payload);
\r
2927 _Output _Out = _M_pFunc(_OutputVector);
\r
2928 return (new message<_Output>(_Out));
\r
2932 /// Initialize the recalculate block
\r
2934 /// <param name="_NumInputs">
\r
2935 /// The number of inputs
\r
2937 void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
\r
2939 initialize_source_and_target(_PScheduler, _PScheduleGroup);
\r
2941 _M_connectedSources.set_bound(_NumInputs);
\r
2943 // Non greedy joins need a buffer to snap off saved message ids to.
\r
2944 _M_savedIdBuffer = new runtime_object_identity[_NumInputs];
\r
2945 memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);
\r
2948 // An array containing the accepted messages of this join.
\r
2949 std::vector<message<_Input>*> _M_messageArray;
\r
2951 // An array containing the msg ids of messages propagated to the array
\r
2952 // For non-greedy joins, this contains the message id of any message
\r
2954 std::vector<runtime_object_identity> _M_savedMessageIdArray;
\r
2956 // Buffer for snapping saved ids in non-greedy joins
\r
2957 runtime_object_identity * _M_savedIdBuffer;
\r
2959 // The transformer method called by this block
\r
2960 _Recalculate_method _M_pFunc;
\r
2962 // Queue to hold output messages
\r
2963 MessageQueue<_Output> _M_messageBuffer;
\r
2967 // Container class to hold a join_transform block and keep unbounded buffers in front of each input.
\r
2969 template<class _Input, class _Output>
\r
2970 class buffered_join
\r
2972 typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;
\r
2975 buffered_join(int _NumInputs, _Transform_method const& _Func): m_currentInput(0), m_numInputs(_NumInputs)
\r
2977 m_buffers = new unbounded_buffer<_Input>*[_NumInputs];
\r
2978 m_join = new join_transform<_Input,_Output,greedy>(_NumInputs, _Func);
\r
2980 for(int i = 0; i < _NumInputs; i++)
\r
2982 m_buffers[i] = new unbounded_buffer<_Input>;
\r
2983 m_buffers[i]->link_target(m_join);
\r
2989 for(int i = 0; i < m_numInputs; i++)
\r
2990 delete m_buffers[i];
\r
2991 delete [] m_buffers;
\r
2995 // Add input takes _PSource and connects it to the next input on this block
\r
2996 void add_input(ISource<_Input> * _PSource)
\r
2998 _PSource->link_target(m_buffers[m_currentInput]);
\r
3002 // link_target links this container block to _PTarget
\r
3003 void link_target(ITarget<_Output> * _PTarget)
\r
3005 m_join->link_target(_PTarget);
\r
3009 int m_currentInput;
\r
3011 unbounded_buffer<_Input> ** m_buffers;
\r
3012 join_transform<_Input,_Output,greedy> * m_join;
\r
3014 } // namespace Concurrency
\r