]> git.sesse.net Git - casparcg/blob - concrt_extras/agents_extras.h
(no commit message)
[casparcg] / concrt_extras / agents_extras.h
1 //--------------------------------------------------------------------------\r
2 // \r
3 //  Copyright (c) Microsoft Corporation.  All rights reserved. \r
4 // \r
5 //  File: agents_extras.h\r
6 //\r
7 //  Implementation of various useful message blocks\r
8 //\r
9 //--------------------------------------------------------------------------\r
10 \r
11 #pragma once\r
12 \r
13 #include <agents.h>\r
14 \r
15 // bounded_buffer uses a map\r
16 #include <map>\r
17 #include <queue>\r
18 \r
19 namespace Concurrency\r
20 {\r
21     /// <summary>\r
22     ///     Simple queue class for storing messages.\r
23     /// </summary>\r
24     /// <typeparam name="_Type">\r
25     ///     The payload type of messages stored in this queue.\r
26     /// </typeparam>\r
27     template <class _Type>\r
28     class MessageQueue\r
29     {\r
30     public:\r
31         typedef message<_Type> _Message;\r
32 \r
33         /// <summary>\r
34         ///     Constructs an initially empty queue.\r
35         /// </summary>\r
36         MessageQueue()\r
37         {\r
38         }\r
39 \r
40         /// <summary>\r
41         ///     Removes and deletes any messages remaining in the queue.\r
42         /// </summary>\r
43         ~MessageQueue() \r
44         {\r
45             _Message * _Msg = dequeue();\r
46             while (_Msg != NULL)\r
47             {\r
48                 delete _Msg;\r
49                 _Msg = dequeue();\r
50             }\r
51         }\r
52 \r
53         /// <summary>\r
54         ///     Add an item to the queue.\r
55         /// </summary>\r
56         /// <param name="_Msg">\r
57         ///     Message to add.\r
58         /// </param>\r
59         void enqueue(_Message *_Msg)\r
60         {\r
61             _M_queue.push(_Msg);\r
62         }\r
63 \r
64         /// <summary>\r
65         ///     Dequeue an item from the head of queue.\r
66         /// </summary>\r
67         /// <returns>\r
68         ///     Returns a pointer to the message found at the head of the queue.\r
69         /// </returns>\r
70         _Message * dequeue()\r
71         {\r
72             _Message * _Msg = NULL;\r
73 \r
74             if (!_M_queue.empty())\r
75             {\r
76                 _Msg = _M_queue.front();\r
77                 _M_queue.pop();\r
78             }\r
79 \r
80             return _Msg;\r
81         }\r
82 \r
83         /// <summary>\r
84         ///     Return the item at the head of the queue, without dequeuing\r
85         /// </summary>\r
86         /// <returns>\r
87         ///     Returns a pointer to the message found at the head of the queue.\r
88         /// </returns>\r
89         _Message * peek() const\r
90         {\r
91             _Message * _Msg = NULL;\r
92 \r
93             if (!_M_queue.empty())\r
94             {\r
95                 _Msg = _M_queue.front();\r
96             }\r
97 \r
98             return _Msg;\r
99         }\r
100 \r
101         /// <summary>\r
102         ///     Returns the number of items currently in the queue.\r
103         /// </summary>\r
104         /// <returns>\r
105         /// Size of the queue.\r
106         /// </returns>\r
107         size_t count() const\r
108         {\r
109             return _M_queue.size();\r
110         }\r
111 \r
112         /// <summary>\r
113         ///     Checks to see if specified msg id is at the head of the queue.\r
114         /// </summary>\r
115         /// <param name="_MsgId">\r
116         ///     Message id to check for.\r
117         /// </param>\r
118         /// <returns>\r
119         ///     True if a message with specified id is at the head, false otherwise.\r
120         /// </returns>\r
121         bool is_head(const runtime_object_identity _MsgId) const\r
122         {\r
123             _Message * _Msg = peek();\r
124             if(_Msg != NULL)\r
125             {\r
126                 return _Msg->msg_id() == _MsgId;\r
127             }\r
128             return false;\r
129         }\r
130 \r
131     private:\r
132         \r
133         std::queue<_Message *> _M_queue;\r
134     };\r
135 \r
136     /// <summary>\r
137     ///     Simple queue implementation that takes into account priority\r
138     ///     using the comparison operator <.\r
139     /// </summary>\r
140     /// <typeparam name="_Type">\r
141     ///     The payload type of messages stored in this queue.\r
142     /// </typeparam>\r
143     template <class _Type>\r
144     class PriorityQueue\r
145     {\r
146     public:\r
147         /// <summary>\r
148         ///     Constructs an initially empty queue.\r
149         /// </summary>\r
150         PriorityQueue() : _M_pHead(NULL), _M_count(0) {}\r
151 \r
152         /// <summary>\r
153         ///     Removes and deletes any messages remaining in the queue.\r
154         /// </summary>\r
155         ~PriorityQueue() \r
156         {\r
157             message<_Type> * _Msg = dequeue();\r
158             while (_Msg != NULL)\r
159             {\r
160                 delete _Msg;\r
161                 _Msg = dequeue();\r
162             }\r
163         }\r
164 \r
165         /// <summary>\r
166         ///     Add an item to the queue, comparisons using the 'payload' field\r
167         ///     will determine the location in the queue.\r
168         /// </summary>\r
169         /// <param name="_Msg">\r
170         ///     Message to add.\r
171         /// </param>\r
172         /// <param name="fCanReplaceHead">\r
173         ///     True if this new message can be inserted at the head.\r
174         /// </param>\r
175         void enqueue(message<_Type> *_Msg, const bool fInsertAtHead = true)\r
176         {\r
177             MessageNode *_Element = new MessageNode();\r
178             _Element->_M_pMsg = _Msg;\r
179 \r
180             // Find location to insert.\r
181             MessageNode *pCurrent = _M_pHead;\r
182             MessageNode *pPrev = NULL;\r
183             if(!fInsertAtHead && pCurrent != NULL)\r
184             {\r
185                 pPrev = pCurrent;\r
186                 pCurrent = pCurrent->_M_pNext;\r
187             }\r
188             while(pCurrent != NULL)\r
189             {\r
190                 if(_Element->_M_pMsg->payload < pCurrent->_M_pMsg->payload)\r
191                 {\r
192                     break;\r
193                 }\r
194                 pPrev = pCurrent;\r
195                 pCurrent = pCurrent->_M_pNext;\r
196             }\r
197 \r
198             // Insert at head.\r
199             if(pPrev == NULL)\r
200             {\r
201                 _M_pHead = _Element;\r
202             }\r
203             else\r
204             {\r
205                 pPrev->_M_pNext = _Element;\r
206             }\r
207 \r
208             // Last item in queue.\r
209             if(pCurrent == NULL)\r
210             {\r
211                 _Element->_M_pNext = NULL;\r
212             }\r
213             else\r
214             {\r
215                 _Element->_M_pNext = pCurrent;\r
216             }\r
217 \r
218             ++_M_count;\r
219         }\r
220 \r
221         /// <summary>\r
222         ///     Dequeue an item from the head of queue.\r
223         /// </summary>\r
224         /// <returns>\r
225         ///     Returns a pointer to the message found at the head of the queue.\r
226         /// </returns>\r
227         message<_Type> * dequeue()\r
228         {\r
229             if (_M_pHead == NULL) \r
230             {\r
231                 return NULL;\r
232             }\r
233 \r
234             MessageNode *_OldHead = _M_pHead;\r
235             message<_Type> * _Result = _OldHead->_M_pMsg;\r
236 \r
237             _M_pHead = _OldHead->_M_pNext;\r
238 \r
239             delete _OldHead;\r
240 \r
241             if(--_M_count == 0)\r
242             {\r
243                 _M_pHead = NULL;\r
244             }\r
245             return _Result;\r
246         }\r
247 \r
248         /// <summary>\r
249         ///     Return the item at the head of the queue, without dequeuing\r
250         /// </summary>\r
251         /// <returns>\r
252         ///     Returns a pointer to the message found at the head of the queue.\r
253         /// </returns>\r
254         message<_Type> * peek() const\r
255         {\r
256             if(_M_count != 0)\r
257             {\r
258                 return _M_pHead->_M_pMsg;\r
259             }\r
260             return NULL;\r
261         }\r
262 \r
263         /// <summary>\r
264         ///     Returns the number of items currently in the queue.\r
265         /// </summary>\r
266         /// <returns>\r
267         ///     Size of the queue.\r
268         /// </returns>\r
269         size_t count() const\r
270         {\r
271             return _M_count;\r
272         }\r
273 \r
274         /// <summary>\r
275         ///     Checks to see if specified msg id is at the head of the queue.\r
276         /// </summary>\r
277         /// <param name="_MsgId">\r
278         ///     Message id to check for.\r
279         /// </param>\r
280         /// <returns>\r
281         ///     True if a message with specified id is at the head, false otherwise.\r
282         /// </returns>\r
283         bool is_head(const runtime_object_identity _MsgId) const\r
284         {\r
285             if(_M_count != 0)\r
286             {\r
287                 return _M_pHead->_M_pMsg->msg_id() == _MsgId;\r
288             }\r
289             return false;\r
290         }\r
291 \r
292     private:\r
293         \r
294         // Used to store individual message nodes.\r
295         struct MessageNode\r
296         {\r
297             MessageNode() : _M_pMsg(NULL), _M_pNext(NULL) {}\r
298             message<_Type> * _M_pMsg;\r
299             MessageNode * _M_pNext;\r
300         };\r
301 \r
302         // A pointer to the head of the queue.\r
303         MessageNode * _M_pHead;\r
304 \r
305         // The number of elements presently stored in the queue.\r
306         size_t _M_count;\r
307     };\r
308         \r
309     /// <summary>\r
310     ///        priority_buffer is a buffer that uses a comparison operator on the 'payload' of each message to determine\r
311     ///     order when offering to targets. Besides this it acts exactly like an unbounded_buffer.\r
312     /// </summary>\r
313     /// <typeparam name="_Type">\r
314     ///     The payload type of messages stored and propagated by the buffer.\r
315     /// </typeparam>\r
316     template<class _Type>\r
317     class priority_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
318     {\r
319     public:\r
320 \r
321         /// <summary>\r
322         ///     Creates an priority_buffer within the default scheduler, and places it any schedule\r
323         ///     group of the scheduler\92s choosing.\r
324         /// </summary>\r
325         priority_buffer() \r
326         {\r
327             initialize_source_and_target();\r
328         }\r
329 \r
330         /// <summary>\r
331         ///     Creates an priority_buffer within the default scheduler, and places it any schedule\r
332         ///     group of the scheduler\92s choosing.\r
333         /// </summary>\r
334         /// <param name="_Filter">\r
335         ///     A reference to a filter function.\r
336         /// </param>\r
337         priority_buffer(filter_method const& _Filter)\r
338         {\r
339             initialize_source_and_target();\r
340             register_filter(_Filter);\r
341         }\r
342 \r
343         /// <summary>\r
344         ///     Creates an priority_buffer within the specified scheduler, and places it any schedule\r
345         ///     group of the scheduler\92s choosing.\r
346         /// </summary>\r
347         /// <param name="_PScheduler">\r
348         ///     A reference to a scheduler instance.\r
349         /// </param>\r
350         priority_buffer(Scheduler& _PScheduler)\r
351         {\r
352             initialize_source_and_target(&_PScheduler);\r
353         }\r
354 \r
355         /// <summary>\r
356         ///     Creates an priority_buffer within the specified scheduler, and places it any schedule\r
357         ///     group of the scheduler\92s choosing.\r
358         /// </summary>\r
359         /// <param name="_PScheduler">\r
360         ///     A reference to a scheduler instance.\r
361         /// </param>\r
362         /// <param name="_Filter">\r
363         ///     A reference to a filter function.\r
364         /// </param>\r
365         priority_buffer(Scheduler& _PScheduler, filter_method const& _Filter) \r
366         {\r
367             initialize_source_and_target(&_PScheduler);\r
368             register_filter(_Filter);\r
369         }\r
370 \r
371         /// <summary>\r
372         ///     Creates an priority_buffer within the specified schedule group.  The scheduler is implied\r
373         ///     by the schedule group.\r
374         /// </summary>\r
375         /// <param name="_PScheduleGroup">\r
376         ///     A reference to a schedule group.\r
377         /// </param>\r
378         priority_buffer(ScheduleGroup& _PScheduleGroup)\r
379         {\r
380             initialize_source_and_target(NULL, &_PScheduleGroup);\r
381         }\r
382 \r
383         /// <summary>\r
384         ///     Creates an priority_buffer within the specified schedule group.  The scheduler is implied\r
385         ///     by the schedule group.\r
386         /// </summary>\r
387         /// <param name="_PScheduleGroup">\r
388         ///     A reference to a schedule group.\r
389         /// </param>\r
390         /// <param name="_Filter">\r
391         ///     A reference to a filter function.\r
392         /// </param>\r
393         priority_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
394         {\r
395             initialize_source_and_target(NULL, &_PScheduleGroup);\r
396             register_filter(_Filter);\r
397         }\r
398 \r
399         /// <summary>\r
400         ///     Cleans up any resources that may have been created by the priority_buffer.\r
401         /// </summary>\r
402         ~priority_buffer()\r
403         {\r
404             // Remove all links\r
405             remove_network_links();\r
406         }\r
407 \r
408         /// <summary>\r
409         ///     Add an item to the priority_buffer\r
410         /// </summary>\r
411         /// <param name="_Item">\r
412         ///     A reference to the item to add.\r
413         /// </param>\r
414         /// <returns>\r
415         ///     A boolean indicating whether the data was accepted.\r
416         /// </returns>\r
417         bool enqueue(_Type const& _Item)\r
418         {\r
419             return Concurrency::send<_Type>(this, _Item);\r
420         }\r
421 \r
422         /// <summary>\r
423         ///     Remove an item from the priority_buffer\r
424         /// </summary>\r
425         /// <returns>\r
426         ///     The message payload.\r
427         /// </returns>\r
428         _Type dequeue()\r
429         {\r
430             return receive<_Type>(this);\r
431         }\r
432 \r
433     protected:\r
434 \r
435         /// <summary>\r
436         ///     The main propagate() function for ITarget blocks.  Called by a source\r
437         ///     block, generally within an asynchronous task to send messages to its targets.\r
438         /// </summary>\r
439         /// <param name="_PMessage">\r
440         ///     A pointer to the message.\r
441         /// </param>\r
442         /// <param name="_PSource">\r
443         ///     A pointer to the source block offering the message.\r
444         /// </param>\r
445         /// <returns>\r
446         ///     An indication of what the target decided to do with the message.\r
447         /// </returns>\r
448         /// <remarks>\r
449         ///     It is important that calls to propagate do *not* take the same lock on the\r
450         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
451         ///     result in a deadlock with the Consume call.  (in the case of the priority_buffer,\r
452         ///     this lock is the m_internalLock)\r
453         /// </remarks>\r
454         virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
455         {\r
456             message_status _Result = accepted;\r
457             //\r
458             // Accept the message being propagated\r
459             // Note: depending on the source block propagating the message\r
460             // this may not necessarily be the same message (pMessage) first\r
461             // passed into the function.\r
462             //\r
463             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
464 \r
465             if (_PMessage != NULL)\r
466             {\r
467                 async_send(_PMessage);\r
468             }\r
469             else\r
470             {\r
471                 _Result = missed;\r
472             }\r
473 \r
474             return _Result;\r
475         }\r
476 \r
477         /// <summary>\r
478         ///     Synchronously sends a message to this block.  When this function completes the message will\r
479         ///     already have propagated into the block.\r
480         /// </summary>\r
481         /// <param name="_PMessage">\r
482         ///     A pointer to the message.\r
483         /// </param>\r
484         /// <param name="_PSource">\r
485         ///     A pointer to the source block offering the message.\r
486         /// </param>\r
487         /// <returns>\r
488         ///     An indication of what the target decided to do with the message.\r
489         /// </returns>\r
490         virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
491         {\r
492             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
493 \r
494             if (_PMessage != NULL)\r
495             {\r
496                 sync_send(_PMessage);\r
497             }\r
498             else\r
499             {\r
500                 return missed;\r
501             }\r
502 \r
503             return accepted;\r
504         }\r
505 \r
506         /// <summary>\r
507         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
508         /// </summary>\r
509         /// <param name="_MsgId">\r
510         ///     The runtime object identity of the message.\r
511         /// </param>\r
512         /// <returns>\r
513         ///     A pointer to the message that the caller now has ownership of.\r
514         /// </returns>\r
515         virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
516         {\r
517             //\r
518             // Peek at the head message in the message buffer.  If the Ids match\r
519             // dequeue and transfer ownership\r
520             //\r
521             message<_Type> * _Msg = NULL;\r
522 \r
523             if (_M_messageBuffer.is_head(_MsgId))\r
524             {\r
525                 _Msg = _M_messageBuffer.dequeue();\r
526             }\r
527 \r
528             return _Msg;\r
529         }\r
530 \r
531         /// <summary>\r
532         ///     Reserves a message previously offered by the source.\r
533         /// </summary>\r
534         /// <param name="_MsgId">\r
535         ///     The runtime object identity of the message.\r
536         /// </param>\r
537         /// <returns>\r
538         ///     A Boolean indicating whether the reservation worked or not.\r
539         /// </returns>\r
540         /// <remarks>\r
541         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
542         /// </remarks>\r
543         virtual bool reserve_message(runtime_object_identity _MsgId)\r
544         {\r
545             // Allow reservation if this is the head message\r
546             return _M_messageBuffer.is_head(_MsgId);\r
547         }\r
548 \r
549         /// <summary>\r
550         ///     Consumes a message that was reserved previously.\r
551         /// </summary>\r
552         /// <param name="_MsgId">\r
553         ///     The runtime object identity of the message.\r
554         /// </param>\r
555         /// <returns>\r
556         ///     A pointer to the message that the caller now has ownership of.\r
557         /// </returns>\r
558         /// <remarks>\r
559         ///     Similar to 'accept', but is always preceded by a call to 'reserve'.\r
560         /// </remarks>\r
561         virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
562         {\r
563             // By default, accept the message\r
564             return accept_message(_MsgId);\r
565         }\r
566 \r
567         /// <summary>\r
568         ///     Releases a previous message reservation.\r
569         /// </summary>\r
570         /// <param name="_MsgId">\r
571         ///     The runtime object identity of the message.\r
572         /// </param>\r
573         virtual void release_message(runtime_object_identity _MsgId)\r
574         {\r
575             // The head message is the one reserved.\r
576             if (!_M_messageBuffer.is_head(_MsgId))\r
577             {\r
578                 throw message_not_found();\r
579             }\r
580         }\r
581 \r
582         /// <summary>\r
583         ///    Resumes propagation after a reservation has been released\r
584         /// </summary>\r
585         virtual void resume_propagation()\r
586         {\r
587             // If there are any messages in the buffer, propagate them out\r
588             if (_M_messageBuffer.count() > 0)\r
589             {\r
590                 async_send(NULL);\r
591             }\r
592         }\r
593 \r
594         /// <summary>\r
595         ///     Notification that a target was linked to this source.\r
596         /// </summary>\r
597         /// <param name="_PTarget">\r
598         ///     A pointer to the newly linked target.\r
599         /// </param>\r
600         virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
601         {\r
602             // If the message queue is blocked due to reservation\r
603             // there is no need to do any message propagation\r
604             if (_M_pReservedFor != NULL)\r
605             {\r
606                 return;\r
607             }\r
608 \r
609             message<_Type> * _Msg = _M_messageBuffer.peek();\r
610 \r
611             if (_Msg != NULL)\r
612             {\r
613                 // Propagate the head message to the new target\r
614                 message_status _Status = _PTarget->propagate(_Msg, this);\r
615 \r
616                 if (_Status == accepted)\r
617                 {\r
618                     // The target accepted the message, restart propagation.\r
619                     propagate_to_any_targets(NULL);\r
620                 }\r
621 \r
622                 // If the status is anything other than accepted, then leave\r
623                 // the message queue blocked.\r
624             }\r
625         }\r
626 \r
627         /// <summary>\r
628         /// Takes the message and propagates it to all the targets of this priority_buffer.\r
629         /// </summary>\r
630         /// <param name="_PMessage">\r
631         ///     A pointer to a new message.\r
632         /// </param>\r
633         virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
634         {\r
635             // Enqueue pMessage to the internal unbounded buffer queue if it is non-NULL.\r
636             // _PMessage can be NULL if this LWT was the result of a Repropagate call\r
637             // out of a Consume or Release (where no new message is queued up, but\r
638             // everything remaining in the priority_buffer needs to be propagated out)\r
639             if (_PMessage != NULL)\r
640             {\r
641                 message<_Type> *pPrevHead = _M_messageBuffer.peek();\r
642 \r
643                 // If a reservation is held make sure to not insert this new\r
644                 // message before it.\r
645                 if(_M_pReservedFor != NULL)\r
646                 {\r
647                     _M_messageBuffer.enqueue(_PMessage, false);\r
648                 }\r
649                 else\r
650                 {\r
651                     _M_messageBuffer.enqueue(_PMessage);\r
652                 }\r
653 \r
654                 // If the head message didn't change, we can safely assume that\r
655                 // the head message is blocked and waiting on Consume(), Release() or a new\r
656                 // link_target()\r
657                 if (pPrevHead != NULL && !_M_messageBuffer.is_head(pPrevHead->msg_id()))\r
658                 {\r
659                     return;\r
660                 }\r
661             }\r
662 \r
663             // Attempt to propagate messages to all the targets\r
664             _Propagate_priority_order();\r
665         }\r
666 \r
667     private:\r
668 \r
669         /// <summary>\r
670         ///     Attempts to propagate out any messages currently in the block.\r
671         /// </summary>\r
672         void _Propagate_priority_order()\r
673         {\r
674             message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
675 \r
676             // If someone has reserved the _Head message, don't propagate anymore\r
677             if (_M_pReservedFor != NULL)\r
678             {\r
679                 return;\r
680             }\r
681 \r
682             while (_Msg != NULL)\r
683             {\r
684                 message_status _Status = declined;\r
685 \r
686                 // Always start from the first target that linked.\r
687                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
688                 {\r
689                     ITarget<_Target_type> * _PTarget = *_Iter;\r
690                     _Status = _PTarget->propagate(_Msg, this);\r
691 \r
692                     // Ownership of message changed. Do not propagate this\r
693                     // message to any other target.\r
694                     if (_Status == accepted)\r
695                     {\r
696                         break;\r
697                     }\r
698 \r
699                     // If the target just propagated to reserved this message, stop\r
700                     // propagating it to others.\r
701                     if (_M_pReservedFor != NULL)\r
702                     {\r
703                         break;\r
704                     }\r
705                 }\r
706 \r
707                 // If status is anything other than accepted, then the head message\r
708                 // was not propagated out.  Thus, nothing after it in the queue can\r
709                 // be propagated out.  Cease propagation.\r
710                 if (_Status != accepted)\r
711                 {\r
712                     break;\r
713                 }\r
714 \r
715                 // Get the next message\r
716                 _Msg = _M_messageBuffer.peek();\r
717             }\r
718         }\r
719 \r
720         /// <summary>\r
721         ///     Priority Queue used to store messages.\r
722         /// </summary>\r
723         PriorityQueue<_Type> _M_messageBuffer;\r
724 \r
725         //\r
726         // Hide assignment operator and copy constructor.\r
727         //\r
728         priority_buffer const &operator =(priority_buffer const&);  // no assignment operator\r
729         priority_buffer(priority_buffer const &);                   // no copy constructor\r
730     };\r
731 \r
732 \r
733     /// <summary>\r
734     ///    A bounded_buffer implementation. Once the capacity is reached it will save the offered message\r
735     ///    id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume\r
736     ///    any of the postponed messages. Preference is given to previously offered messages before new ones.\r
737     ///\r
738     ///    NOTE: this bounded_buffer implementation contains code that is very unique to this particular block. \r
739     ///          Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer\r
740     ///          implementation uses a critical_section, several interlocked operations, and additional calls to async_send.\r
741     ///          These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem.\r
742     /// </summary>\r
743     /// <typeparam name="_Type">\r
744     ///     The payload type of messages stored and propagated by the buffer.\r
745     /// </typeparam>\r
746     template<class _Type>\r
747     class bounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
748     {\r
749     public:\r
750         /// <summary>\r
751         ///     Creates an bounded_buffer within the default scheduler, and places it any schedule\r
752         ///     group of the scheduler\92s choosing.\r
753         /// </summary>\r
754         bounded_buffer(const size_t capacity)\r
755             : _M_capacity(capacity), _M_currentSize(0)\r
756         {\r
757             initialize_source_and_target();\r
758         }\r
759 \r
760         /// <summary>\r
761         ///     Creates an bounded_buffer within the default scheduler, and places it any schedule\r
762         ///     group of the scheduler\92s choosing.\r
763         /// </summary>\r
764         /// <param name="_Filter">\r
765         ///     A reference to a filter function.\r
766         /// </param>\r
767         bounded_buffer(const size_t capacity, filter_method const& _Filter)\r
768             : _M_capacity(capacity), _M_currentSize(0)\r
769         {\r
770             initialize_source_and_target();\r
771             register_filter(_Filter);\r
772         }\r
773 \r
774         /// <summary>\r
775         ///     Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
776         ///     group of the scheduler\92s choosing.\r
777         /// </summary>\r
778         /// <param name="_PScheduler">\r
779         ///     A reference to a scheduler instance.\r
780         /// </param>\r
781         bounded_buffer(const size_t capacity, Scheduler& _PScheduler)\r
782             : _M_capacity(capacity), _M_currentSize(0)\r
783         {\r
784             initialize_source_and_target(&_PScheduler);\r
785         }\r
786 \r
787         /// <summary>\r
788         ///     Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
789         ///     group of the scheduler\92s choosing.\r
790         /// </summary>\r
791         /// <param name="_PScheduler">\r
792         ///     A reference to a scheduler instance.\r
793         /// </param>\r
794         /// <param name="_Filter">\r
795         ///     A reference to a filter function.\r
796         /// </param>\r
797         bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter) \r
798             : _M_capacity(capacity), _M_currentSize(0)\r
799         {\r
800             initialize_source_and_target(&_PScheduler);\r
801             register_filter(_Filter);\r
802         }\r
803 \r
804         /// <summary>\r
805         ///     Creates an bounded_buffer within the specified schedule group.  The scheduler is implied\r
806         ///     by the schedule group.\r
807         /// </summary>\r
808         /// <param name="_PScheduleGroup">\r
809         ///     A reference to a schedule group.\r
810         /// </param>\r
811         bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup)\r
812             : _M_capacity(capacity), _M_currentSize(0)\r
813         {\r
814             initialize_source_and_target(NULL, &_PScheduleGroup);\r
815         }\r
816 \r
817         /// <summary>\r
818         ///     Creates an bounded_buffer within the specified schedule group.  The scheduler is implied\r
819         ///     by the schedule group.\r
820         /// </summary>\r
821         /// <param name="_PScheduleGroup">\r
822         ///     A reference to a schedule group.\r
823         /// </param>\r
824         /// <param name="_Filter">\r
825         ///     A reference to a filter function.\r
826         /// </param>\r
827         bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
828             : _M_capacity(capacity), _M_currentSize(0)\r
829         {\r
830             initialize_source_and_target(NULL, &_PScheduleGroup);\r
831             register_filter(_Filter);\r
832         }\r
833 \r
834         /// <summary>\r
835         ///     Cleans up any resources that may have been used by the bounded_buffer.\r
836         /// </summary>\r
837         ~bounded_buffer()\r
838         {\r
839             // Remove all links\r
840             remove_network_links();\r
841         }\r
842 \r
843         /// <summary>\r
844         ///     Add an item to the bounded_buffer.\r
845         /// </summary>\r
846         /// <param name="_Item">\r
847         ///     A reference to the item to add.\r
848         /// </param>\r
849         /// <returns>\r
850         ///     A boolean indicating whether the data was accepted.\r
851         /// </returns>\r
852         bool enqueue(_Type const& _Item)\r
853         {\r
854             return Concurrency::send<_Type>(this, _Item);\r
855         }\r
856 \r
857         /// <summary>\r
858         ///     Remove an item from the bounded_buffer.\r
859         /// </summary>\r
860         /// <returns>\r
861         ///     The message payload.\r
862         /// </returns>\r
863         _Type dequeue()\r
864         {\r
865             return receive<_Type>(this);\r
866         }\r
867 \r
868     protected:\r
869 \r
870         /// <summary>\r
871         ///     The main propagate() function for ITarget blocks.  Called by a source\r
872         ///     block, generally within an asynchronous task to send messages to its targets.\r
873         /// </summary>\r
874         /// <param name="_PMessage">\r
875         ///     A pointer to the message.\r
876         /// </param>\r
877         /// <param name="_PSource">\r
878         ///     A pointer to the source block offering the message.\r
879         /// </param>\r
880         /// <returns>\r
881         ///     An indication of what the target decided to do with the message.\r
882         /// </returns>\r
883         /// <remarks>\r
884         ///     It is important that calls to propagate do *not* take the same lock on the\r
885         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
886         ///     result in a deadlock with the Consume call.  (in the case of the bounded_buffer,\r
887         ///     this lock is the m_internalLock)\r
888         /// </remarks>\r
889         virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
890         {\r
891             message_status _Result = accepted;\r
892             \r
893             // Check current capacity. \r
894             if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
895             {\r
896                 // Postpone the message, buffer is full.\r
897                 _InterlockedDecrement(&_M_currentSize);\r
898                 _Result = postponed;\r
899 \r
900                 // Save off the message id from this source to later try\r
901                 // and reserve/consume when more space is free.\r
902                 {\r
903                     critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
904                     _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
905                 }\r
906 \r
907                 async_send(NULL);\r
908             }\r
909             else\r
910             {\r
911                 //\r
912                 // Accept the message being propagated\r
913                 // Note: depending on the source block propagating the message\r
914                 // this may not necessarily be the same message (pMessage) first\r
915                 // passed into the function.\r
916                 //\r
917                 _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
918 \r
919                 if (_PMessage != NULL)\r
920                 {\r
921                     async_send(_PMessage);\r
922                 }\r
923                 else\r
924                 {\r
925                     // Didn't get a message so need to decrement.\r
926                     _InterlockedDecrement(&_M_currentSize);\r
927                     _Result = missed;\r
928                     async_send(NULL);\r
929                 }\r
930             }\r
931 \r
932             return _Result;\r
933         }\r
934 \r
935         /// <summary>\r
936         ///     Synchronously sends a message to this block.  When this function completes the message will\r
937         ///     already have propagated into the block.\r
938         /// </summary>\r
939         /// <param name="_PMessage">\r
940         ///     A pointer to the message.\r
941         /// </param>\r
942         /// <param name="_PSource">\r
943         ///     A pointer to the source block offering the message.\r
944         /// </param>\r
945         /// <returns>\r
946         ///     An indication of what the target decided to do with the message.\r
947         /// </returns>\r
948         virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
949         {\r
950             message_status _Result = accepted;\r
951             \r
952             // Check current capacity. \r
953             if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
954             {\r
955                 // Postpone the message, buffer is full.\r
956                 _InterlockedDecrement(&_M_currentSize);\r
957                 _Result = postponed;\r
958 \r
959                 // Save off the message id from this source to later try\r
960                 // and reserve/consume when more space is free.\r
961                 {\r
962                     critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
963                     _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
964                 }\r
965 \r
966                 async_send(NULL);\r
967             }\r
968             else\r
969             {\r
970                 //\r
971                 // Accept the message being propagated\r
972                 // Note: depending on the source block propagating the message\r
973                 // this may not necessarily be the same message (pMessage) first\r
974                 // passed into the function.\r
975                 //\r
976                 _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
977 \r
978                 if (_PMessage != NULL)\r
979                 {\r
980                     async_send(_PMessage);\r
981                 }\r
982                 else\r
983                 {\r
984                     // Didn't get a message so need to decrement.\r
985                     _InterlockedDecrement(&_M_currentSize);\r
986                     _Result = missed;\r
987                     async_send(NULL);\r
988                 }\r
989             }\r
990 \r
991             return _Result;\r
992         }\r
993 \r
994         /// <summary>\r
995         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
996         /// </summary>\r
997         /// <param name="_MsgId">\r
998         ///     The runtime object identity of the message.\r
999         /// </param>\r
1000         /// <returns>\r
1001         ///     A pointer to the message that the caller now has ownership of.\r
1002         /// </returns>\r
1003         virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
1004         {\r
1005             //\r
1006             // Peek at the head message in the message buffer.  If the Ids match\r
1007             // dequeue and transfer ownership\r
1008             //\r
1009             message<_Type> * _Msg = NULL;\r
1010 \r
1011             if (_M_messageBuffer.is_head(_MsgId))\r
1012             {\r
1013                 _Msg = _M_messageBuffer.dequeue();\r
1014 \r
1015                 // Give preference to any previously postponed messages\r
1016                 // before decrementing current size.\r
1017                 if(!try_consume_msg())\r
1018                 {\r
1019                     _InterlockedDecrement(&_M_currentSize);\r
1020                 }\r
1021             }\r
1022 \r
1023             return _Msg;\r
1024         }\r
1025 \r
1026         /// <summary>\r
1027         ///     Try to reserve and consume a message from list of saved message ids.\r
1028         /// </summary>\r
1029         /// <returns>\r
1030         ///     True if a message was sucessfully consumed, false otherwise.\r
1031         /// </returns>\r
1032         bool try_consume_msg()\r
1033         {\r
1034             runtime_object_identity _ReservedId = -1;\r
1035             ISource<_Type> * _PSource = NULL;\r
1036 \r
1037             // Walk through source links seeing if any saved ids exist.\r
1038             bool _ConsumedMsg = true;\r
1039             while(_ConsumedMsg)\r
1040             {\r
1041                 source_iterator _Iter = _M_connectedSources.begin();\r
1042                 {\r
1043                     critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
1044                     for (; *_Iter != NULL; ++_Iter)\r
1045                     {\r
1046                         _PSource = *_Iter;\r
1047                         std::map<ISource<_Type> *, runtime_object_identity>::iterator _MapIter;\r
1048                         if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end())\r
1049                         {\r
1050                             _ReservedId = _MapIter->second;\r
1051                             _M_savedSourceMsgIds.erase(_MapIter);\r
1052                             break;\r
1053                         }\r
1054                     }\r
1055                 }\r
1056 \r
1057                 // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster.\r
1058                 if(_ReservedId != -1)\r
1059                 {\r
1060                     if(_PSource->reserve(_ReservedId, this))\r
1061                     {\r
1062                         message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this);\r
1063                         async_send(_ConsumedMsg);\r
1064                         return true;\r
1065                     }\r
1066                     // Reserve failed go or link was removed, \r
1067                     // go back and try and find a different msg id.\r
1068                     else\r
1069                     {\r
1070                         continue;\r
1071                     }\r
1072                 }\r
1073 \r
1074                 // If this point is reached the map of source ids was empty.\r
1075                 break;\r
1076             }\r
1077 \r
1078             return false;\r
1079         }\r
1080 \r
1081         /// <summary>\r
1082         ///     Reserves a message previously offered by the source.\r
1083         /// </summary>\r
1084         /// <param name="_MsgId">\r
1085         ///     The runtime object identity of the message.\r
1086         /// </param>\r
1087         /// <returns>\r
1088         ///     A Boolean indicating whether the reservation worked or not.\r
1089         /// </returns>\r
1090         /// <remarks>\r
1091         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
1092         /// </remarks>\r
1093         virtual bool reserve_message(runtime_object_identity _MsgId)\r
1094         {\r
1095             // Allow reservation if this is the head message\r
1096             return _M_messageBuffer.is_head(_MsgId);\r
1097         }\r
1098 \r
1099         /// <summary>\r
1100         ///     Consumes a message that was reserved previously.\r
1101         /// </summary>\r
1102         /// <param name="_MsgId">\r
1103         ///     The runtime object identity of the message.\r
1104         /// </param>\r
1105         /// <returns>\r
1106         ///     A pointer to the message that the caller now has ownership of.\r
1107         /// </returns>\r
1108         /// <remarks>\r
1109         ///     Similar to 'accept', but is always preceded by a call to 'reserve'.\r
1110         /// </remarks>\r
1111         virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
1112         {\r
1113             // By default, accept the message\r
1114             return accept_message(_MsgId);\r
1115         }\r
1116 \r
1117         /// <summary>\r
1118         ///     Releases a previous message reservation.\r
1119         /// </summary>\r
1120         /// <param name="_MsgId">\r
1121         ///     The runtime object identity of the message.\r
1122         /// </param>\r
1123         virtual void release_message(runtime_object_identity _MsgId)\r
1124         {\r
1125             // The head message is the one reserved.\r
1126             if (!_M_messageBuffer.is_head(_MsgId))\r
1127             {\r
1128                 throw message_not_found();\r
1129             }\r
1130         }\r
1131 \r
1132         /// <summary>\r
1133         ///    Resumes propagation after a reservation has been released\r
1134         /// </summary>\r
1135         virtual void resume_propagation()\r
1136         {\r
1137             // If there are any messages in the buffer, propagate them out\r
1138             if (_M_messageBuffer.count() > 0)\r
1139             {\r
1140                 async_send(NULL);\r
1141             }\r
1142         }\r
1143 \r
1144         /// <summary>\r
1145         ///     Notification that a target was linked to this source.\r
1146         /// </summary>\r
1147         /// <param name="_PTarget">\r
1148         ///     A pointer to the newly linked target.\r
1149         /// </param>\r
1150         virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
1151         {\r
1152             // If the message queue is blocked due to reservation\r
1153             // there is no need to do any message propagation\r
1154             if (_M_pReservedFor != NULL)\r
1155             {\r
1156                 return;\r
1157             }\r
1158 \r
1159             message<_Type> * _Msg = _M_messageBuffer.peek();\r
1160 \r
1161             if (_Msg != NULL)\r
1162             {\r
1163                 // Propagate the head message to the new target\r
1164                 message_status _Status = _PTarget->propagate(_Msg, this);\r
1165 \r
1166                 if (_Status == accepted)\r
1167                 {\r
1168                     // The target accepted the message, restart propagation.\r
1169                     propagate_to_any_targets(NULL);\r
1170                 }\r
1171 \r
1172                 // If the status is anything other than accepted, then leave\r
1173                 // the message queue blocked.\r
1174             }\r
1175         }\r
1176 \r
1177         /// <summary>\r
1178         ///     Takes the message and propagates it to all the targets of this bounded_buffer.\r
1179         ///     This is called from async_send.\r
1180         /// </summary>\r
1181         /// <param name="_PMessage">\r
1182         ///     A pointer to a new message.\r
1183         /// </param>\r
1184         virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
1185         {\r
1186             // Enqueue pMessage to the internal message buffer if it is non-NULL.\r
1187             // pMessage can be NULL if this LWT was the result of a Repropagate call\r
1188             // out of a Consume or Release (where no new message is queued up, but\r
1189             // everything remaining in the bounded buffer needs to be propagated out)\r
1190             if (_PMessage != NULL)\r
1191             {\r
1192                 _M_messageBuffer.enqueue(_PMessage);\r
1193 \r
1194                 // If the incoming pMessage is not the head message, we can safely assume that\r
1195                 // the head message is blocked and waiting on Consume(), Release() or a new\r
1196                 // link_target() and cannot be propagated out.\r
1197                 if (_M_messageBuffer.is_head(_PMessage->msg_id()))\r
1198                 {\r
1199                     _Propagate_priority_order();\r
1200                 }\r
1201             }\r
1202             else\r
1203             {\r
1204                 // While current size is less than capacity try to consume\r
1205                 // any previously offered ids.\r
1206                 bool _ConsumedMsg = true;\r
1207                 while(_ConsumedMsg)\r
1208                 {\r
1209                     // Assume a message will be found to successfully consume in the\r
1210                     // saved ids, if not this will be decremented afterwards.\r
1211                     if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
1212                     {\r
1213                         break;\r
1214                     }\r
1215 \r
1216                     _ConsumedMsg = try_consume_msg();\r
1217                 }\r
1218 \r
1219                 // Decrement the current size, we broke out of the previous loop\r
1220                 // because we reached capacity or there were no more messages to consume.\r
1221                 _InterlockedDecrement(&_M_currentSize);\r
1222             }\r
1223         }\r
1224 \r
1225     private:\r
1226 \r
1227         /// <summary>\r
1228         ///     Attempts to propagate out any messages currently in the block.\r
1229         /// </summary>\r
1230         void _Propagate_priority_order()\r
1231         {\r
1232             message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
1233 \r
1234             // If someone has reserved the _Head message, don't propagate anymore\r
1235             if (_M_pReservedFor != NULL)\r
1236             {\r
1237                 return;\r
1238             }\r
1239 \r
1240             while (_Msg != NULL)\r
1241             {\r
1242                 message_status _Status = declined;\r
1243 \r
1244                 // Always start from the first target that linked.\r
1245                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
1246                 {\r
1247                     ITarget<_Target_type> * _PTarget = *_Iter;\r
1248                     _Status = _PTarget->propagate(_Msg, this);\r
1249 \r
1250                     // Ownership of message changed. Do not propagate this\r
1251                     // message to any other target.\r
1252                     if (_Status == accepted)\r
1253                     {\r
1254                         break;\r
1255                     }\r
1256 \r
1257                     // If the target just propagated to reserved this message, stop\r
1258                     // propagating it to others.\r
1259                     if (_M_pReservedFor != NULL)\r
1260                     {\r
1261                         break;\r
1262                     }\r
1263                 }\r
1264 \r
1265                 // If status is anything other than accepted, then the head message\r
1266                 // was not propagated out.  Thus, nothing after it in the queue can\r
1267                 // be propagated out.  Cease propagation.\r
1268                 if (_Status != accepted)\r
1269                 {\r
1270                     break;\r
1271                 }\r
1272 \r
1273                 // Get the next message\r
1274                 _Msg = _M_messageBuffer.peek();\r
1275             }\r
1276         }\r
1277 \r
1278         /// <summary>\r
1279         ///     Message buffer used to store messages.\r
1280         /// </summary>\r
1281         MessageQueue<_Type> _M_messageBuffer;\r
1282 \r
1283         /// <summary>\r
1284         ///     Maximum number of messages bounded_buffer can hold.\r
1285         /// </summary>\r
1286         const size_t _M_capacity;\r
1287 \r
1288         /// <summary>\r
1289         ///     Current number of messages in bounded_buffer.\r
1290         /// </summary>\r
1291         volatile long _M_currentSize;\r
1292 \r
1293         /// <summary>\r
1294         ///     Lock used to guard saved message ids map.\r
1295         /// </summary>\r
1296         critical_section _M_savedIdsLock;\r
1297 \r
1298         /// <summary>\r
1299         ///     Map of source links to saved message ids.\r
1300         /// </summary>\r
1301         std::map<ISource<_Type> *, runtime_object_identity> _M_savedSourceMsgIds;\r
1302 \r
1303         //\r
1304         // Hide assignment operator and copy constructor\r
1305         //\r
1306         bounded_buffer const &operator =(bounded_buffer const&);  // no assignment operator\r
1307         bounded_buffer(bounded_buffer const &);                   // no copy constructor\r
1308     };\r
1309 \r
1310     /// <summary>\r
1311     ///        A simple alternator, offers messages in order to each target\r
1312     ///     one at a time. If a consume occurs a message won't be offered to that target again\r
1313     ///     until all others are given a chance. This causes messages to be distributed more\r
1314     ///     evenly among targets.\r
1315     /// </summary>\r
1316     /// <typeparam name="_Type">\r
1317     ///     The payload type of messages stored and propagated by the buffer.\r
1318     /// </typeparam>\r
1319     template<class _Type>\r
1320     class alternator : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
1321     {\r
1322     public:\r
1323         /// <summary>\r
1324         ///     Create an alternator within the default scheduler, and places it any schedule\r
1325         ///     group of the scheduler\92s choosing.\r
1326         /// </summary>\r
1327         alternator()\r
1328             : _M_indexNextTarget(0)\r
1329         {\r
1330             initialize_source_and_target();\r
1331         }\r
1332 \r
1333         /// <summary>\r
1334         ///     Creates an alternator within the default scheduler, and places it any schedule\r
1335         ///     group of the scheduler\92s choosing.\r
1336         /// </summary>\r
1337         /// <param name="_Filter">\r
1338         ///     A reference to a filter function.\r
1339         /// </param>\r
1340         alternator(filter_method const& _Filter)\r
1341             : _M_indexNextTarget(0)\r
1342         {\r
1343             initialize_source_and_target();\r
1344             register_filter(_Filter);\r
1345         }\r
1346 \r
1347         /// <summary>\r
1348         ///     Creates an alternator within the specified scheduler, and places it any schedule\r
1349         ///     group of the scheduler\92s choosing.\r
1350         /// </summary>\r
1351         /// <param name="_PScheduler">\r
1352         ///     A reference to a scheduler instance.\r
1353         /// </param>\r
1354         alternator(Scheduler& _PScheduler)\r
1355             : _M_indexNextTarget(0)\r
1356         {\r
1357             initialize_source_and_target(&_PScheduler);\r
1358         }\r
1359 \r
1360         /// <summary>\r
1361         ///     Creates an alternator within the specified scheduler, and places it any schedule\r
1362         ///     group of the scheduler\92s choosing.\r
1363         /// </summary>\r
1364         /// <param name="_PScheduler">\r
1365         ///     A reference to a scheduler instance.\r
1366         /// </param>\r
1367         /// <param name="_Filter">\r
1368         ///     A reference to a filter function.\r
1369         /// </param>\r
1370         alternator(Scheduler& _PScheduler, filter_method const& _Filter) \r
1371             : _M_indexNextTarget(0)\r
1372         {\r
1373             initialize_source_and_target(&_PScheduler);\r
1374             register_filter(_Filter);\r
1375         }\r
1376 \r
1377         /// <summary>\r
1378         ///     Creates an alternator within the specified schedule group.  The scheduler is implied\r
1379         ///     by the schedule group.\r
1380         /// </summary>\r
1381         /// <param name="_PScheduleGroup">\r
1382         ///     A reference to a schedule group.\r
1383         /// </param>\r
1384         alternator(ScheduleGroup& _PScheduleGroup)\r
1385             : _M_indexNextTarget(0)\r
1386         {\r
1387             initialize_source_and_target(NULL, &_PScheduleGroup);\r
1388         }\r
1389 \r
1390         /// <summary>\r
1391         ///     Creates an alternator within the specified schedule group.  The scheduler is implied\r
1392         ///     by the schedule group.\r
1393         /// </summary>\r
1394         /// <param name="_PScheduleGroup">\r
1395         ///     A reference to a schedule group.\r
1396         /// </param>\r
1397         /// <param name="_Filter">\r
1398         ///     A reference to a filter function.\r
1399         /// </param>\r
1400         alternator(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
1401             : _M_indexNextTarget(0)\r
1402         {\r
1403             initialize_source_and_target(NULL, &_PScheduleGroup);\r
1404             register_filter(_Filter);\r
1405         }\r
1406 \r
1407         /// <summary>\r
1408         ///     Cleans up any resources that may have been created by the alternator.\r
1409         /// </summary>\r
1410         ~alternator()\r
1411         {\r
1412             // Remove all links\r
1413             remove_network_links();\r
1414         }\r
1415 \r
1416     protected:\r
1417 \r
1418         /// <summary>\r
1419         ///     The main propagate() function for ITarget blocks.  Called by a source\r
1420         ///     block, generally within an asynchronous task to send messages to its targets.\r
1421         /// </summary>\r
1422         /// <param name="_PMessage">\r
1423         ///     A pointer to the message\r
1424         /// </param>\r
1425         /// <param name="_PSource">\r
1426         ///     A pointer to the source block offering the message.\r
1427         /// </param>\r
1428         /// <returns>\r
1429         ///     An indication of what the target decided to do with the message.\r
1430         /// </returns>\r
1431         /// <remarks>\r
1432         ///     It is important that calls to propagate do *not* take the same lock on the\r
1433         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
1434         ///     result in a deadlock with the Consume call.  (in the case of the alternator,\r
1435         ///     this lock is the m_internalLock)\r
1436         /// </remarks>\r
1437         virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
1438         {\r
1439             message_status _Result = accepted;\r
1440             //\r
1441             // Accept the message being propagated\r
1442             // Note: depending on the source block propagating the message\r
1443             // this may not necessarily be the same message (pMessage) first\r
1444             // passed into the function.\r
1445             //\r
1446             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
1447 \r
1448             if (_PMessage != NULL)\r
1449             {\r
1450                 async_send(_PMessage);\r
1451             }\r
1452             else\r
1453             {\r
1454                 _Result = missed;\r
1455             }\r
1456 \r
1457             return _Result;\r
1458         }\r
1459 \r
1460         /// <summary>\r
1461         ///     Synchronously sends a message to this block.  When this function completes the message will\r
1462         ///     already have propagated into the block.\r
1463         /// </summary>\r
1464         /// <param name="_PMessage">\r
1465         ///     A pointer to the message.\r
1466         /// </param>\r
1467         /// <param name="_PSource">\r
1468         ///     A pointer to the source block offering the message.\r
1469         /// </param>\r
1470         /// <returns>\r
1471         ///     An indication of what the target decided to do with the message.\r
1472         /// </returns>\r
1473         virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
1474         {\r
1475             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
1476 \r
1477             if (_PMessage != NULL)\r
1478             {\r
1479                 sync_send(_PMessage);\r
1480             }\r
1481             else\r
1482             {\r
1483                 return missed;\r
1484             }\r
1485 \r
1486             return accepted;\r
1487         }\r
1488 \r
1489         /// <summary>\r
1490         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
1491         /// </summary>\r
1492         /// <param name="_MsgId">\r
1493         ///     The runtime object identity of the message.\r
1494         /// </param>\r
1495         /// <returns>\r
1496         ///     A pointer to the message that the caller now has ownership of.\r
1497         /// </returns>\r
1498         virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
1499         {\r
1500             //\r
1501             // Peek at the head message in the message buffer.  If the Ids match\r
1502             // dequeue and transfer ownership\r
1503             //\r
1504             message<_Type> * _Msg = NULL;\r
1505 \r
1506             if (_M_messageBuffer.is_head(_MsgId))\r
1507             {\r
1508                 _Msg = _M_messageBuffer.dequeue();\r
1509             }\r
1510 \r
1511             return _Msg;\r
1512         }\r
1513 \r
1514         /// <summary>\r
1515         ///     Reserves a message previously offered by the source.\r
1516         /// </summary>\r
1517         /// <param name="_MsgId">\r
1518         ///     The runtime object identity of the message.\r
1519         /// </param>\r
1520         /// <returns>\r
1521         ///     A Boolean indicating whether the reservation worked or not.\r
1522         /// </returns>\r
1523         /// <remarks>\r
1524         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
1525         /// </remarks>\r
1526         virtual bool reserve_message(runtime_object_identity _MsgId)\r
1527         {\r
1528             // Allow reservation if this is the head message\r
1529             return _M_messageBuffer.is_head(_MsgId);\r
1530         }\r
1531 \r
1532         /// <summary>\r
1533         ///     Consumes a message that was reserved previously.\r
1534         /// </summary>\r
1535         /// <param name="_MsgId">\r
1536         ///     The runtime object identity of the message.\r
1537         /// </param>\r
1538         /// <returns>\r
1539         ///     A pointer to the message that the caller now has ownership of.\r
1540         /// </returns>\r
1541         /// <remarks>\r
1542         ///     Similar to 'accept', but is always preceded by a call to 'reserve'.\r
1543         /// </remarks>\r
1544         virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
1545         {\r
1546             // Update so we don't offer to this target again until\r
1547             // all others have a chance.\r
1548             target_iterator _CurrentIter = _M_connectedTargets.begin();\r
1549             for(size_t i = 0;*_CurrentIter != NULL; ++_CurrentIter, ++i) \r
1550             {\r
1551                 if(*_CurrentIter == _M_pReservedFor)\r
1552                 {\r
1553                     _M_indexNextTarget = i + 1;\r
1554                     break;\r
1555                 }\r
1556             }\r
1557 \r
1558             // By default, accept the message\r
1559             return accept_message(_MsgId);\r
1560         }\r
1561 \r
1562         /// <summary>\r
1563         ///     Releases a previous message reservation.\r
1564         /// </summary>\r
1565         /// <param name="_MsgId">\r
1566         ///     The runtime object identity of the message.\r
1567         /// </param>\r
1568         virtual void release_message(runtime_object_identity _MsgId)\r
1569         {\r
1570             // The head message is the one reserved.\r
1571             if (!_M_messageBuffer.is_head(_MsgId))\r
1572             {\r
1573                 throw message_not_found();\r
1574             }\r
1575         }\r
1576 \r
1577         /// <summary>\r
1578         ///    Resumes propagation after a reservation has been released.\r
1579         /// </summary>\r
1580         virtual void resume_propagation()\r
1581         {\r
1582             // If there are any messages in the buffer, propagate them out\r
1583             if (_M_messageBuffer.count() > 0)\r
1584             {\r
1585                 async_send(NULL);\r
1586             }\r
1587         }\r
1588 \r
1589         /// <summary>\r
1590         ///     Notification that a target was linked to this source.\r
1591         /// </summary>\r
1592         /// <param name="_PTarget">\r
1593         ///     A pointer to the newly linked target.\r
1594         /// </param>\r
1595         virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
1596         {\r
1597             // If the message queue is blocked due to reservation\r
1598             // there is no need to do any message propagation\r
1599             if (_M_pReservedFor != NULL)\r
1600             {\r
1601                 return;\r
1602             }\r
1603 \r
1604             message<_Type> * _Msg = _M_messageBuffer.peek();\r
1605 \r
1606             if (_Msg != NULL)\r
1607             {\r
1608                 // Propagate the head message to the new target\r
1609                 message_status _Status = _PTarget->propagate(_Msg, this);\r
1610 \r
1611                 if (_Status == accepted)\r
1612                 {\r
1613                     // The target accepted the message, restart propagation.\r
1614                     propagate_to_any_targets(NULL);\r
1615                 }\r
1616 \r
1617                 // If the status is anything other than accepted, then leave\r
1618                 // the message queue blocked.\r
1619             }\r
1620         }\r
1621 \r
1622         /// <summary>\r
1623         ///     Takes the message and propagates it to all the targets of this alternator.\r
1624         ///     This is called from async_send.\r
1625         /// </summary>\r
1626         /// <param name="_PMessage">\r
1627         ///     A pointer to a new message.\r
1628         /// </param>\r
1629         virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
1630         {\r
1631             // Enqueue pMessage to the internal buffer queue if it is non-NULL.\r
1632             // pMessage can be NULL if this LWT was the result of a Repropagate call\r
1633             // out of a Consume or Release (where no new message is queued up, but\r
1634             // everything remaining in the unbounded buffer needs to be propagated out)\r
1635             if (_PMessage != NULL)\r
1636             {\r
1637                 _M_messageBuffer.enqueue(_PMessage);\r
1638 \r
1639                 // If the incoming pMessage is not the head message, we can safely assume that\r
1640                 // the head message is blocked and waiting on Consume(), Release() or a new\r
1641                 // link_target()\r
1642                 if (!_M_messageBuffer.is_head(_PMessage->msg_id()))\r
1643                 {\r
1644                     return;\r
1645                 }\r
1646             }\r
1647 \r
1648             // Attempt to propagate messages to targets in order last left off.\r
1649             _Propagate_alternating_order();\r
1650         }\r
1651 \r
1652         /// <summary>\r
1653         ///     Offers messages to targets in alternating order to help distribute messages\r
1654         ///     evenly among targets.\r
1655         /// </summary>\r
1656         void _Propagate_alternating_order()\r
1657         {\r
1658             message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
1659 \r
1660             // If someone has reserved the _Head message, don't propagate anymore\r
1661             if (_M_pReservedFor != NULL)\r
1662             {\r
1663                 return;\r
1664             }\r
1665 \r
1666             //\r
1667             // Try to start where left off before, if the link has been removed\r
1668             // or this is the first time then start at the beginning.\r
1669             //\r
1670             target_iterator _CurrentIter = _M_connectedTargets.begin();\r
1671             const target_iterator _FirstLinkIter(_CurrentIter);\r
1672             for(size_t i = 0;*_CurrentIter != NULL && i < _M_indexNextTarget; ++_CurrentIter, ++i) {}\r
1673 \r
1674             while (_Msg != NULL)\r
1675             {\r
1676                 message_status _Status = declined;\r
1677 \r
1678                 // Loop offering message until end of links is reached.\r
1679                 target_iterator _StartedIter(_CurrentIter);\r
1680                 for(;*_CurrentIter != NULL; ++_CurrentIter)\r
1681                 {\r
1682                     _Status = (*_CurrentIter)->propagate(_Msg, this);\r
1683                     ++_M_indexNextTarget;\r
1684 \r
1685                     // Ownership of message changed. Do not propagate this\r
1686                     // message to any other target.\r
1687                     if (_Status == accepted)\r
1688                     {\r
1689                         ++_CurrentIter;\r
1690                         break;\r
1691                     }\r
1692 \r
1693                     // If the target just propagated to reserved this message, stop\r
1694                     // propagating it to others\r
1695                     if (_M_pReservedFor != NULL)\r
1696                     {\r
1697                         return;\r
1698                     }\r
1699                 }\r
1700 \r
1701                 // Message ownership changed go to next messages.\r
1702                 if (_Status == accepted)\r
1703                 {\r
1704                     continue;\r
1705                 }\r
1706 \r
1707                 // Try starting from the beginning until the first link offering was started at.\r
1708                 _M_indexNextTarget = 0;\r
1709                 for(_CurrentIter = _FirstLinkIter;*_CurrentIter != NULL; ++_CurrentIter)\r
1710                 {\r
1711                     // I have offered the same message to all links now so stop.\r
1712                     if(*_CurrentIter == *_StartedIter)\r
1713                     {\r
1714                         break;\r
1715                     }\r
1716 \r
1717                     _Status = (*_CurrentIter)->propagate(_Msg, this);\r
1718                     ++_M_indexNextTarget;\r
1719 \r
1720                     // Ownership of message changed. Do not propagate this\r
1721                     // message to any other target.\r
1722                     if (_Status == accepted)\r
1723                     {\r
1724                         ++_CurrentIter;\r
1725                         break;\r
1726                     }\r
1727 \r
1728                     // If the target just propagated to reserved this message, stop\r
1729                     // propagating it to others\r
1730                     if (_M_pReservedFor != NULL)\r
1731                     {\r
1732                         return;\r
1733                     }\r
1734                 }\r
1735 \r
1736                 // If status is anything other than accepted, then the head message\r
1737                 // was not propagated out.  Thus, nothing after it in the queue can\r
1738                 // be propagated out.  Cease propagation.\r
1739                 if (_Status != accepted)\r
1740                 {\r
1741                     break;\r
1742                 }\r
1743 \r
1744                 // Get the next message\r
1745                 _Msg = _M_messageBuffer.peek();\r
1746             }\r
1747         }\r
1748 \r
1749     private:\r
1750 \r
1751         /// <summary>\r
1752         ///     Message queue used to store messages.\r
1753         /// </summary>\r
1754         MessageQueue<_Type> _M_messageBuffer;\r
1755 \r
1756         /// <summary>\r
1757         ///     Index of next target to call propagate on. Used to alternate and load\r
1758         ///     balance message offering.\r
1759         /// </summary>\r
1760         size_t _M_indexNextTarget;\r
1761 \r
1762         //\r
1763         // Hide assignment operator and copy constructor.\r
1764         //\r
1765         alternator const &operator =(alternator const&);  // no assignment operator\r
1766         alternator(alternator const &);                   // no copy constructor\r
1767     };\r
1768 \r
1769     #include <agents.h>\r
1770         \r
1771     //\r
1772     // Sample block that combines join and transform.\r
1773     //\r
1774     template<class _Input, class _Output, join_type _Jtype = non_greedy>\r
1775     class join_transform : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>\r
1776     {\r
1777     public:\r
1778 \r
1779         typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;\r
1780 \r
1781         /// <summary>\r
1782         ///     Create an join block within the default scheduler, and places it any schedule\r
1783         ///     group of the scheduler\92s choosing.\r
1784         /// </summary>\r
1785         /// <param name="_NumInputs">\r
1786         ///     The number of inputs this join will be allowed\r
1787         /// </param>\r
1788         join_transform(size_t _NumInputs, _Transform_method const& _Func)\r
1789             : _M_messageArray(_NumInputs, 0),\r
1790               _M_savedMessageIdArray(_NumInputs, -1),\r
1791               _M_pFunc(_Func)\r
1792         {\r
1793             _Initialize(_NumInputs);\r
1794         }\r
1795 \r
1796         /// <summary>\r
1797         ///     Create an join block within the default scheduler, and places it any schedule\r
1798         ///     group of the scheduler\92s choosing.\r
1799         /// </summary>\r
1800         /// <param name="_NumInputs">\r
1801         ///     The number of inputs this join will be allowed\r
1802         /// </param>\r
1803         /// <param name="_Filter">\r
1804         ///     A filter method placed on this join\r
1805         /// </param>\r
1806         join_transform(size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
1807             : _M_messageArray(_NumInputs, 0),\r
1808               _M_savedMessageIdArray(_NumInputs, -1),\r
1809               _M_pFunc(_Func)\r
1810         {\r
1811             _Initialize(_NumInputs);\r
1812             register_filter(_Filter);\r
1813         }\r
1814 \r
1815         /// <summary>\r
1816         ///     Create an join block within the specified scheduler, and places it any schedule\r
1817         ///     group of the scheduler\92s choosing.\r
1818         /// </summary>\r
1819         /// <param name="_Scheduler">\r
1820         ///     The scheduler onto which the task's message propagation will be scheduled.\r
1821         /// </param>\r
1822         /// <param name="_NumInputs">\r
1823         ///     The number of inputs this join will be allowed\r
1824         /// </param>\r
1825         join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func)\r
1826             : _M_messageArray(_NumInputs, 0),\r
1827               _M_savedMessageIdArray(_NumInputs, -1),\r
1828               _M_pFunc(_Func)\r
1829         {\r
1830             _Initialize(_NumInputs, &_PScheduler);\r
1831         }\r
1832 \r
1833         /// <summary>\r
1834         ///     Create an join block within the specified scheduler, and places it any schedule\r
1835         ///     group of the scheduler\92s choosing.\r
1836         /// </summary>\r
1837         /// <param name="_Scheduler">\r
1838         ///     The scheduler onto which the task's message propagation will be scheduled.\r
1839         /// </param>\r
1840         /// <param name="_NumInputs">\r
1841         ///     The number of inputs this join will be allowed\r
1842         /// </param>\r
1843         /// <param name="_Filter">\r
1844         ///     A filter method placed on this join\r
1845         /// </param>\r
1846         join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
1847             : _M_messageArray(_NumInputs, 0),\r
1848               _M_savedMessageIdArray(_NumInputs, -1),\r
1849               _M_pFunc(_Func)\r
1850         {\r
1851             _Initialize(_NumInputs, &_PScheduler);\r
1852             register_filter(_Filter);\r
1853         }\r
1854 \r
1855         /// <summary>\r
1856         ///     Create an join block within the specified schedule group.  The scheduler is implied\r
1857         ///     by the schedule group.\r
1858         /// </summary>\r
1859         /// <param name="_PScheduleGroup">\r
1860         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
1861         /// </param>\r
1862         /// <param name="_NumInputs">\r
1863         ///     The number of inputs this join will be allowed\r
1864         /// </param>\r
1865         join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func)\r
1866             : _M_messageArray(_NumInputs, 0),\r
1867               _M_savedMessageIdArray(_NumInputs, -1),\r
1868               _M_pFunc(_Func)\r
1869         {\r
1870             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
1871         }\r
1872 \r
1873         /// <summary>\r
1874         ///     Create an join block within the specified schedule group.  The scheduler is implied\r
1875         ///     by the schedule group.\r
1876         /// </summary>\r
1877         /// <param name="_PScheduleGroup">\r
1878         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
1879         /// </param>\r
1880         /// <param name="_NumInputs">\r
1881         ///     The number of inputs this join will be allowed\r
1882         /// </param>\r
1883         /// <param name="_Filter">\r
1884         ///     A filter method placed on this join\r
1885         /// </param>\r
1886         join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
1887             : _M_messageArray(_NumInputs, 0),\r
1888               _M_savedMessageIdArray(_NumInputs, -1),\r
1889               _M_pFunc(_Func)\r
1890         {\r
1891             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
1892             register_filter(_Filter);\r
1893         }\r
1894 \r
1895         /// <summary>\r
1896         ///     Destroys a join\r
1897         /// </summary>\r
1898         ~join_transform()\r
1899         {\r
1900             // Remove all links that are targets of this join\r
1901             remove_network_links();\r
1902 \r
1903             delete [] _M_savedIdBuffer;\r
1904         }\r
1905 \r
1906     protected:\r
1907         //\r
1908         // propagator_block protected function implementations\r
1909         //\r
1910 \r
1911         /// <summary>\r
1912         ///     The main propagate() function for ITarget blocks.  Called by a source\r
1913         ///     block, generally within an asynchronous task to send messages to its targets.\r
1914         /// </summary>\r
1915         /// <param name="_PMessage">\r
1916         ///     The message being propagated\r
1917         /// </param>\r
1918         /// <param name="_PSource">\r
1919         ///     The source doing the propagation\r
1920         /// </param>\r
1921         /// <returns>\r
1922         ///     An indication of what the target decided to do with the message.\r
1923         /// </returns>\r
1924         /// <remarks>\r
1925         ///     It is important that calls to propagate do *not* take the same lock on the\r
1926         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
1927         ///     result in a deadlock with the Consume call. \r
1928         /// </remarks>\r
1929         message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) \r
1930         {\r
1931             message_status _Ret_val = accepted;\r
1932 \r
1933             //\r
1934             // Find the slot index of this source\r
1935             //\r
1936             size_t _Slot = 0;\r
1937             bool _Found = false;\r
1938             for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)\r
1939             {\r
1940                 if (*_Iter == _PSource)\r
1941                 {\r
1942                     _Found = true;\r
1943                     break;\r
1944                 }\r
1945 \r
1946                 _Slot++;\r
1947             }\r
1948 \r
1949             if (!_Found)\r
1950             {\r
1951                 // If this source was not found in the array, this is not a connected source\r
1952                 // decline the message\r
1953                 return declined;\r
1954             }\r
1955 \r
1956             _ASSERTE(_Slot < _M_messageArray.size());\r
1957 \r
1958             bool fIsGreedy = (_Jtype == greedy);\r
1959 \r
1960             if (fIsGreedy)\r
1961             {\r
1962                 //\r
1963                 // Greedy type joins immediately accept the message.\r
1964                 //\r
1965                 {\r
1966                     critical_section::scoped_lock lockHolder(_M_propagationLock);\r
1967                     if (_M_messageArray[_Slot] != NULL)\r
1968                     {\r
1969                         _M_savedMessageIdArray[_Slot] = _PMessage->msg_id();\r
1970                         _Ret_val = postponed;\r
1971                     }\r
1972                 }\r
1973 \r
1974                 if (_Ret_val != postponed)\r
1975                 {\r
1976                     _M_messageArray[_Slot] = _PSource->accept(_PMessage->msg_id(), this);\r
1977 \r
1978                     if (_M_messageArray[_Slot] != NULL)\r
1979                     {\r
1980                         if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
1981                         {\r
1982                             // If messages have arrived on all links, start a propagation\r
1983                             // of the current message\r
1984                             async_send(NULL);\r
1985                         }\r
1986                     }\r
1987                     else\r
1988                     {\r
1989                         _Ret_val = missed;\r
1990                     }\r
1991                 }\r
1992             }\r
1993             else\r
1994             {\r
1995                 //\r
1996                 // Non-greedy type joins save the message ids until they have all arrived\r
1997                 //\r
1998 \r
1999                 if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)\r
2000                 {\r
2001                     // Decrement the message remaining count if this thread is switching \r
2002                     // the saved id from -1 to a valid value.\r
2003                     if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
2004                     {\r
2005                         async_send(NULL);\r
2006                     }\r
2007                 }\r
2008 \r
2009                 // Always return postponed.  This message will be consumed\r
2010                 // in the LWT\r
2011                 _Ret_val = postponed;\r
2012             }\r
2013 \r
2014             return _Ret_val;\r
2015         }\r
2016 \r
2017         /// <summary>\r
2018         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
2019         /// </summary>\r
2020         /// <param name="_MsgId">\r
2021         ///     The runtime object identity of the message.\r
2022         /// </param>\r
2023         /// <returns>\r
2024         ///     A pointer to the message that the caller now has ownership of.\r
2025         /// </returns>\r
2026         virtual message<_Output> * accept_message(runtime_object_identity _MsgId)\r
2027         {\r
2028             //\r
2029             // Peek at the head message in the message buffer.  If the Ids match\r
2030             // dequeue and transfer ownership\r
2031             //\r
2032             message<_Output> * _Msg = NULL;\r
2033 \r
2034             if (_M_messageBuffer.is_head(_MsgId))\r
2035             {\r
2036                 _Msg = _M_messageBuffer.dequeue();\r
2037             }\r
2038 \r
2039             return _Msg;\r
2040         }\r
2041 \r
2042         /// <summary>\r
2043         ///     Reserves a message previously offered by the source.\r
2044         /// </summary>\r
2045         /// <param name="_MsgId">\r
2046         ///     The runtime object identity of the message.\r
2047         /// </param>\r
2048         /// <returns>\r
2049         ///     A bool indicating whether the reservation worked or not\r
2050         /// </returns>\r
2051         /// <remarks>\r
2052         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
2053         /// </remarks>\r
2054         virtual bool reserve_message(runtime_object_identity _MsgId)\r
2055         {\r
2056             // Allow reservation if this is the head message\r
2057             return _M_messageBuffer.is_head(_MsgId);\r
2058         }\r
2059 \r
2060         /// <summary>\r
2061         ///     Consumes a message previously offered by the source and reserved by the target, \r
2062         ///     transferring ownership to the caller.\r
2063         /// </summary>\r
2064         /// <param name="_MsgId">\r
2065         ///     The runtime object identity of the message.\r
2066         /// </param>\r
2067         /// <returns>\r
2068         ///     A pointer to the message that the caller now has ownership of.\r
2069         /// </returns>\r
2070         /// <remarks>\r
2071         ///     Similar to 'accept', but is always preceded by a call to 'reserve'\r
2072         /// </remarks>\r
2073         virtual message<_Output> * consume_message(runtime_object_identity _MsgId)\r
2074         {\r
2075             // By default, accept the message\r
2076             return accept_message(_MsgId);\r
2077         }\r
2078 \r
2079         /// <summary>\r
2080         ///     Releases a previous message reservation.\r
2081         /// </summary>\r
2082         /// <param name="_MsgId">\r
2083         ///     The runtime object identity of the message.\r
2084         /// </param>\r
2085         virtual void release_message(runtime_object_identity _MsgId)\r
2086         {\r
2087             // The head message is the one reserved.\r
2088             if (!_M_messageBuffer.is_head(_MsgId))\r
2089             {\r
2090                 throw message_not_found();\r
2091             }\r
2092         }\r
2093 \r
2094         /// <summary>\r
2095         ///     Resumes propagation after a reservation has been released\r
2096         /// </summary>\r
2097         virtual void resume_propagation()\r
2098         {\r
2099             // If there are any messages in the buffer, propagate them out\r
2100             if (_M_messageBuffer.count() > 0)\r
2101             {\r
2102                 async_send(NULL);\r
2103             }\r
2104         }\r
2105 \r
2106         /// <summary>\r
2107         ///     Notification that a target was linked to this source.\r
2108         /// </summary>\r
2109         /// <param name="_PTarget">\r
2110         ///     A pointer to the newly linked target.\r
2111         /// </param>\r
2112         virtual void link_target_notification(ITarget<_Output> *)\r
2113         {\r
2114             // If the message queue is blocked due to reservation\r
2115             // there is no need to do any message propagation\r
2116             if (_M_pReservedFor != NULL)\r
2117             {\r
2118                 return;\r
2119             }\r
2120 \r
2121             _Propagate_priority_order(_M_messageBuffer);\r
2122         }\r
2123 \r
2124         /// <summary>\r
2125         ///     Takes the message and propagates it to all the target of this join.\r
2126         ///     This is called from async_send.\r
2127         /// </summary>\r
2128         /// <param name="_PMessage">\r
2129         ///     The message being propagated\r
2130         /// </param>\r
2131         void propagate_to_any_targets(message<_Output> *) \r
2132         {\r
2133             message<_Output> * _Msg = NULL;\r
2134             // Create a new message from the input sources\r
2135             // If messagesRemaining == 0, we have a new message to create.  Otherwise, this is coming from\r
2136             // a consume or release from the target.  In that case we don't want to create a new message.\r
2137             if (_M_messagesRemaining == 0)\r
2138             {\r
2139                 // A greedy join can immediately create the message, a non-greedy\r
2140                 // join must try and consume all the messages it has postponed\r
2141                 _Msg = _Create_new_message();\r
2142             }\r
2143 \r
2144             if (_Msg == NULL)\r
2145             {\r
2146                 // Create message failed.  This happens in non_greedy joins when the\r
2147                 // reserve/consumption of a postponed message failed.\r
2148                 _Propagate_priority_order(_M_messageBuffer);\r
2149                 return;\r
2150             }\r
2151 \r
2152             bool fIsGreedy = (_Jtype == greedy);\r
2153 \r
2154             // For a greedy join, reset the number of messages remaining\r
2155             // Check to see if multiple messages have been passed in on any of the links,\r
2156             // and postponed. If so, try and reserve/consume them now\r
2157             if (fIsGreedy)\r
2158             {\r
2159                 // Look at the saved ids and reserve/consume any that have passed in while\r
2160                 // this join was waiting to complete\r
2161                 _ASSERTE(_M_messageArray.size() == _M_savedMessageIdArray.size());\r
2162 \r
2163                 for (size_t i = 0; i < _M_messageArray.size(); i++)\r
2164                 {\r
2165                     for(;;)\r
2166                     {\r
2167                         runtime_object_identity _Saved_id;\r
2168                         // Grab the current saved id value.  This value could be changing from based on any\r
2169                         // calls of source->propagate(this).  If the message id is different than what is snapped\r
2170                         // here, that means, the reserve below must fail.  This is because reserve is trying\r
2171                         // to get the same source lock the propagate(this) call must be holding.\r
2172                         {\r
2173                             critical_section::scoped_lock lockHolder(_M_propagationLock);\r
2174 \r
2175                             _ASSERTE(_M_messageArray[i] != NULL);\r
2176 \r
2177                             _Saved_id = _M_savedMessageIdArray[i];\r
2178 \r
2179                             if (_Saved_id == -1)\r
2180                             {\r
2181                                 _M_messageArray[i] = NULL;\r
2182                                 break;\r
2183                             }\r
2184                             else\r
2185                             {\r
2186                                 _M_savedMessageIdArray[i] = -1;\r
2187                             }\r
2188                         }\r
2189 \r
2190                         if (_Saved_id != -1)\r
2191                         {\r
2192                             source_iterator _Iter = _M_connectedSources.begin();\r
2193                             \r
2194                             ISource<_Input> * _PSource = _Iter[i];\r
2195                             if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))\r
2196                             {\r
2197                                 _M_messageArray[i] = _PSource->consume(_Saved_id, this);\r
2198                                 _InterlockedDecrementSizeT(&_M_messagesRemaining);\r
2199                                 break;\r
2200                             }\r
2201                         }\r
2202                     }\r
2203                 }\r
2204 \r
2205                 // If messages have all been received, async_send again, this will start the\r
2206                 // LWT up to create a new message\r
2207                 if (_M_messagesRemaining == 0)\r
2208                 {\r
2209                     async_send(NULL);\r
2210                 }\r
2211             }\r
2212             \r
2213             // Add the new message to the outbound queue\r
2214             _M_messageBuffer.enqueue(_Msg);\r
2215 \r
2216             if (!_M_messageBuffer.is_head(_Msg->msg_id()))\r
2217             {\r
2218                 // another message is at the head of the outbound message queue and blocked\r
2219                 // simply return\r
2220                 return;\r
2221             }\r
2222 \r
2223             _Propagate_priority_order(_M_messageBuffer);\r
2224         }\r
2225 \r
2226     private:\r
2227 \r
2228         //\r
2229         //  Private Methods\r
2230         //\r
2231 \r
2232         /// <summary>\r
2233         ///     Propagate messages in priority order\r
2234         /// </summary>\r
2235         /// <param name="_MessageBuffer">\r
2236         ///     Reference to a message queue with messages to be propagated\r
2237         /// </param>\r
2238         void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)\r
2239         {\r
2240             message<_Output> * _Msg = _MessageBuffer.peek();\r
2241 \r
2242             // If someone has reserved the _Head message, don't propagate anymore\r
2243             if (_M_pReservedFor != NULL)\r
2244             {\r
2245                 return;\r
2246             }\r
2247 \r
2248             while (_Msg != NULL)\r
2249             {\r
2250                 message_status _Status = declined;\r
2251 \r
2252                 // Always start from the first target that linked\r
2253                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
2254                 {\r
2255                     ITarget<_Output> * _PTarget = *_Iter;\r
2256                     _Status = _PTarget->propagate(_Msg, this);\r
2257 \r
2258                     // Ownership of message changed. Do not propagate this\r
2259                     // message to any other target.\r
2260                     if (_Status == accepted)\r
2261                     {\r
2262                         break;\r
2263                     }\r
2264 \r
2265                     // If the target just propagated to reserved this message, stop\r
2266                     // propagating it to others\r
2267                     if (_M_pReservedFor != NULL)\r
2268                     {\r
2269                         break;\r
2270                     }\r
2271                 }\r
2272 \r
2273                 // If status is anything other than accepted, then the head message\r
2274                 // was not propagated out.  Thus, nothing after it in the queue can\r
2275                 // be propagated out.  Cease propagation.\r
2276                 if (_Status != accepted)\r
2277                 {\r
2278                     break;\r
2279                 }\r
2280 \r
2281                 // Get the next message\r
2282                 _Msg = _MessageBuffer.peek();\r
2283             }\r
2284         }\r
2285 \r
2286         /// <summary>\r
2287         ///     Create a new message from the data output\r
2288         /// </summary>\r
2289         /// <returns>\r
2290         ///     The created message (NULL if creation failed)\r
2291         /// </returns>\r
2292         message<_Output> * __cdecl _Create_new_message()\r
2293         {\r
2294             bool fIsNonGreedy = (_Jtype == non_greedy);\r
2295 \r
2296             // If this is a non-greedy join, check each source and try to consume their message\r
2297             if (fIsNonGreedy)\r
2298             {\r
2299 \r
2300                 // The iterator _Iter below will ensure that it is safe to touch\r
2301                 // non-NULL source pointers. Take a snapshot.\r
2302                 std::vector<ISource<_Input> *> _Sources;\r
2303                 source_iterator _Iter = _M_connectedSources.begin();\r
2304 \r
2305                 while (*_Iter != NULL)\r
2306                 {\r
2307                     ISource<_Input> * _PSource = *_Iter;\r
2308 \r
2309                     if (_PSource == NULL)\r
2310                     {\r
2311                         break;\r
2312                     }\r
2313 \r
2314                     _Sources.push_back(_PSource);\r
2315                     ++_Iter;\r
2316                 }\r
2317 \r
2318                 if (_Sources.size() != _M_messageArray.size())\r
2319                 {\r
2320                     // Some of the sources were unlinked. The join is broken\r
2321                     return NULL;\r
2322                 }\r
2323 \r
2324                 // First, try and reserve all the messages.  If a reservation fails,\r
2325                 // then release any reservations that had been made.\r
2326                 for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)\r
2327                 {\r
2328                     // Snap the current saved id into a buffer.  This value can be changing behind the scenes from\r
2329                     // other source->propagate(msg, this) calls, but if so, that just means the reserve below will\r
2330                     // fail.\r
2331                     _InterlockedIncrementSizeT(&_M_messagesRemaining);\r
2332                     _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);\r
2333 \r
2334                     _ASSERTE(_M_savedIdBuffer[i] != -1);\r
2335 \r
2336                     if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))\r
2337                     {\r
2338                         // A reservation failed, release all reservations made up until\r
2339                         // this block, and wait for another message to arrive on this link\r
2340                         for (size_t j = 0; j < i; j++)\r
2341                         {\r
2342                             _Sources[j]->release(_M_savedIdBuffer[j], this);\r
2343                             if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1) == -1)\r
2344                             {\r
2345                                 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
2346                                 {\r
2347                                     async_send(NULL);\r
2348                                 }\r
2349                             }\r
2350                         }\r
2351 \r
2352                         // Return NULL to indicate that the create failed\r
2353                         return NULL;\r
2354                     }  \r
2355                 }\r
2356 \r
2357                 // Since everything has been reserved, consume all the messages.\r
2358                 // This is guaranteed to return true.\r
2359                 for (size_t i = 0; i < _M_messageArray.size(); i++)\r
2360                 {\r
2361                     _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);\r
2362                     _M_savedIdBuffer[i] = -1;\r
2363                 }\r
2364             }\r
2365 \r
2366             if (!fIsNonGreedy)\r
2367             {\r
2368                 // Reinitialize how many messages are being waited for.\r
2369                 // This is safe because all messages have been received, thus no new async_sends for\r
2370                 // greedy joins can be called.\r
2371                 _M_messagesRemaining = _M_messageArray.size();\r
2372             }\r
2373 \r
2374             std::vector<_Input> _OutputVector;\r
2375             for (size_t i = 0; i < _M_messageArray.size(); i++)\r
2376             {\r
2377                 _ASSERTE(_M_messageArray[i] != NULL);\r
2378                 _OutputVector.push_back(_M_messageArray[i]->payload);\r
2379 \r
2380                 delete _M_messageArray[i];\r
2381                 if (fIsNonGreedy)\r
2382                 {\r
2383                     _M_messageArray[i] = NULL;\r
2384                 }\r
2385             }\r
2386 \r
2387             _Output _Out = _M_pFunc(_OutputVector);\r
2388 \r
2389             return (new message<_Output>(_Out));\r
2390         }\r
2391 \r
2392         /// <summary>\r
2393         ///     Initialize the join block\r
2394         /// </summary>\r
2395         /// <param name="_NumInputs">\r
2396         ///     The number of inputs\r
2397         /// </param>\r
2398         void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)\r
2399         {\r
2400             initialize_source_and_target(_PScheduler, _PScheduleGroup);\r
2401 \r
2402             _M_connectedSources.set_bound(_NumInputs);\r
2403             _M_messagesRemaining = _NumInputs;\r
2404 \r
2405             bool fIsNonGreedy = (_Jtype == non_greedy);\r
2406 \r
2407             if (fIsNonGreedy)\r
2408             {\r
2409                 // Non greedy joins need a buffer to snap off saved message ids to.\r
2410                 _M_savedIdBuffer = new runtime_object_identity[_NumInputs];\r
2411                 memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);\r
2412             }\r
2413             else\r
2414             {\r
2415                 _M_savedIdBuffer = NULL;\r
2416             }\r
2417         }\r
2418 \r
2419         // The current number of messages remaining\r
2420         volatile size_t _M_messagesRemaining;\r
2421 \r
2422         // An array containing the accepted messages of this join.\r
2423         std::vector<message<_Input>*> _M_messageArray;\r
2424 \r
2425         // An array containing the msg ids of messages propagated to the array\r
2426         // For greedy joins, this contains a log of other messages passed to this\r
2427         // join after the first has been accepted\r
2428         // For non-greedy joins, this contains the message id of any message \r
2429         // passed to it.\r
2430         std::vector<runtime_object_identity> _M_savedMessageIdArray;\r
2431 \r
2432         // The transformer method called by this block\r
2433         _Transform_method _M_pFunc;\r
2434 \r
2435         // Buffer for snapping saved ids in non-greedy joins\r
2436         runtime_object_identity * _M_savedIdBuffer;\r
2437 \r
2438         // A lock for modifying the buffer or the connected blocks\r
2439         ::Concurrency::critical_section _M_propagationLock;\r
2440 \r
2441         // Queue to hold output messages\r
2442         MessageQueue<_Output> _M_messageBuffer;\r
2443     };\r
2444 \r
2445     //\r
2446     // Message block that invokes a transform method when it receives message on any of the input links.\r
2447     // A typical example is recal engine for a cell in an Excel spreadsheet.\r
2448     // (Remember that a normal join block is triggered only when it receives messages on all its input links).\r
2449     //\r
2450     template<class _Input, class _Output>\r
2451     class recalculate : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>\r
2452     {\r
2453     public:\r
2454         typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Recalculate_method;\r
2455 \r
2456         /// <summary>\r
2457         ///     Create an recalculate block within the default scheduler, and places it any schedule\r
2458         ///     group of the scheduler\92s choosing.\r
2459         /// </summary>\r
2460         /// <param name="_NumInputs">\r
2461         ///     The number of inputs \r
2462         /// </param>\r
2463         recalculate(size_t _NumInputs, _Recalculate_method const& _Func)\r
2464             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2465               _M_pFunc(_Func)\r
2466         {\r
2467             _Initialize(_NumInputs);\r
2468         }\r
2469 \r
2470         /// <summary>\r
2471         ///     Create an recalculate block within the default scheduler, and places it any schedule\r
2472         ///     group of the scheduler\92s choosing.\r
2473         /// </summary>\r
2474         /// <param name="_NumInputs">\r
2475         ///     The number of inputs \r
2476         /// </param>\r
2477         /// <param name="_Filter">\r
2478         ///     A filter method placed on this join\r
2479         /// </param>\r
2480         recalculate(size_t _NumInputs, _Recalculate_method const& _Func, filter_method const& _Filter)\r
2481             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2482               _M_pFunc(_Func)\r
2483         {\r
2484             _Initialize(_NumInputs);\r
2485             register_filter(_Filter);\r
2486         }\r
2487 \r
2488         /// <summary>\r
2489         ///     Create an recalculate block within the specified scheduler, and places it any schedule\r
2490         ///     group of the scheduler\92s choosing.\r
2491         /// </summary>\r
2492         /// <param name="_Scheduler">\r
2493         ///     The scheduler onto which the task's message propagation will be scheduled.\r
2494         /// </param>\r
2495         /// <param name="_NumInputs">\r
2496         ///     The number of inputs \r
2497         /// </param>\r
2498         recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func)\r
2499             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2500               _M_pFunc(_Func)\r
2501         {\r
2502             _Initialize(_NumInputs, &_PScheduler);\r
2503         }\r
2504 \r
2505         /// <summary>\r
2506         ///     Create an recalculate block within the specified scheduler, and places it any schedule\r
2507         ///     group of the scheduler\92s choosing.\r
2508         /// </summary>\r
2509         /// <param name="_Scheduler">\r
2510         ///     The scheduler onto which the task's message propagation will be scheduled.\r
2511         /// </param>\r
2512         /// <param name="_NumInputs">\r
2513         ///     The number of inputs \r
2514         /// </param>\r
2515         /// <param name="_Filter">\r
2516         ///     A filter method placed on this join\r
2517         /// </param>\r
2518         recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func, filter_method const& _Filter)\r
2519             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2520               _M_pFunc(_Func)\r
2521         {\r
2522             _Initialize(_NumInputs, &_PScheduler);\r
2523             register_filter(_Filter);\r
2524         }\r
2525 \r
2526         /// <summary>\r
2527         ///     Create an recalculate block within the specified schedule group.  The scheduler is implied\r
2528         ///     by the schedule group.\r
2529         /// </summary>\r
2530         /// <param name="_PScheduleGroup">\r
2531         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
2532         /// </param>\r
2533         /// <param name="_NumInputs">\r
2534         ///     The number of inputs \r
2535         /// </param>\r
2536         recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func)\r
2537             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2538               _M_pFunc(_Func)\r
2539         {\r
2540             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
2541         }\r
2542 \r
2543         /// <summary>\r
2544         ///     Create an recalculate block within the specified schedule group.  The scheduler is implied\r
2545         ///     by the schedule group.\r
2546         /// </summary>\r
2547         /// <param name="_PScheduleGroup">\r
2548         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
2549         /// </param>\r
2550         /// <param name="_NumInputs">\r
2551         ///     The number of inputs \r
2552         /// </param>\r
2553         /// <param name="_Filter">\r
2554         ///     A filter method placed on this join\r
2555         /// </param>\r
2556         recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func, filter_method const& _Filter)\r
2557             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2558               _M_pFunc(_Func)\r
2559         {\r
2560             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
2561             register_filter(_Filter);\r
2562         }\r
2563 \r
2564         /// <summary>\r
2565         ///     Destroys a join\r
2566         /// </summary>\r
2567         ~recalculate()\r
2568         {\r
2569             // Remove all links that are targets of this join\r
2570             remove_network_links();\r
2571 \r
2572             delete [] _M_savedIdBuffer;\r
2573         }\r
2574 \r
2575     protected:\r
2576         //\r
2577         // propagator_block protected function implementations\r
2578         //\r
2579 \r
2580         /// <summary>\r
2581         ///     The main propagate() function for ITarget blocks.  Called by a source\r
2582         ///     block, generally within an asynchronous task to send messages to its targets.\r
2583         /// </summary>\r
2584         /// <param name="_PMessage">\r
2585         ///     The message being propagated\r
2586         /// </param>\r
2587         /// <param name="_PSource">\r
2588         ///     The source doing the propagation\r
2589         /// </param>\r
2590         /// <returns>\r
2591         ///     An indication of what the target decided to do with the message.\r
2592         /// </returns>\r
2593         /// <remarks>\r
2594         ///     It is important that calls to propagate do *not* take the same lock on the\r
2595         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
2596         ///     result in a deadlock with the Consume call. \r
2597         /// </remarks>\r
2598         message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) \r
2599         {\r
2600             //\r
2601             // Find the slot index of this source\r
2602             //\r
2603             size_t _Slot = 0;\r
2604             bool _Found = false;\r
2605             for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)\r
2606             {\r
2607                 if (*_Iter == _PSource)\r
2608                 {\r
2609                     _Found = true;\r
2610                     break;\r
2611                 }\r
2612 \r
2613                 _Slot++;\r
2614             }\r
2615 \r
2616             if (!_Found)\r
2617             {\r
2618                 // If this source was not found in the array, this is not a connected source\r
2619                 // decline the message\r
2620                 return declined;\r
2621             }\r
2622 \r
2623             //\r
2624             // Save the message id\r
2625             //\r
2626             if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)\r
2627             {\r
2628                 // If it is not seen by Create_message attempt a recalculate\r
2629                 async_send(NULL);\r
2630             }\r
2631 \r
2632             // Always return postponed.  This message will be consumed\r
2633             // in the LWT\r
2634             return postponed;\r
2635         }\r
2636 \r
2637         /// <summary>\r
2638         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
2639         /// </summary>\r
2640         /// <param name="_MsgId">\r
2641         ///     The runtime object identity of the message.\r
2642         /// </param>\r
2643         /// <returns>\r
2644         ///     A pointer to the message that the caller now has ownership of.\r
2645         /// </returns>\r
2646         virtual message<_Output> * accept_message(runtime_object_identity _MsgId)\r
2647         {\r
2648             //\r
2649             // Peek at the head message in the message buffer.  If the Ids match\r
2650             // dequeue and transfer ownership\r
2651             //\r
2652             message<_Output> * _Msg = NULL;\r
2653 \r
2654             if (_M_messageBuffer.is_head(_MsgId))\r
2655             {\r
2656                 _Msg = _M_messageBuffer.dequeue();\r
2657             }\r
2658 \r
2659             return _Msg;\r
2660         }\r
2661 \r
2662         /// <summary>\r
2663         ///     Reserves a message previously offered by the source.\r
2664         /// </summary>\r
2665         /// <param name="_MsgId">\r
2666         ///     The runtime object identity of the message.\r
2667         /// </param>\r
2668         /// <returns>\r
2669         ///     A bool indicating whether the reservation worked or not\r
2670         /// </returns>\r
2671         /// <remarks>\r
2672         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
2673         /// </remarks>\r
2674         virtual bool reserve_message(runtime_object_identity _MsgId)\r
2675         {\r
2676             // Allow reservation if this is the head message\r
2677             return _M_messageBuffer.is_head(_MsgId);\r
2678         }\r
2679 \r
2680         /// <summary>\r
2681         ///     Consumes a message previously offered by the source and reserved by the target, \r
2682         ///     transferring ownership to the caller.\r
2683         /// </summary>\r
2684         /// <param name="_MsgId">\r
2685         ///     The runtime object identity of the message.\r
2686         /// </param>\r
2687         /// <returns>\r
2688         ///     A pointer to the message that the caller now has ownership of.\r
2689         /// </returns>\r
2690         /// <remarks>\r
2691         ///     Similar to 'accept', but is always preceded by a call to 'reserve'\r
2692         /// </remarks>\r
2693         virtual message<_Output> * consume_message(runtime_object_identity _MsgId)\r
2694         {\r
2695             // By default, accept the message\r
2696             return accept_message(_MsgId);\r
2697         }\r
2698 \r
2699         /// <summary>\r
2700         ///     Releases a previous message reservation.\r
2701         /// </summary>\r
2702         /// <param name="_MsgId">\r
2703         ///     The runtime object identity of the message.\r
2704         /// </param>\r
2705         virtual void release_message(runtime_object_identity _MsgId)\r
2706         {\r
2707             // The head message is the one reserved.\r
2708             if (!_M_messageBuffer.is_head(_MsgId))\r
2709             {\r
2710                 throw message_not_found();\r
2711             }\r
2712         }\r
2713 \r
2714         /// <summary>\r
2715         ///     Resumes propagation after a reservation has been released\r
2716         /// </summary>\r
2717         virtual void resume_propagation()\r
2718         {\r
2719             // If there are any messages in the buffer, propagate them out\r
2720             if (_M_messageBuffer.count() > 0)\r
2721             {\r
2722                 async_send(NULL);\r
2723             }\r
2724         }\r
2725 \r
2726         /// <summary>\r
2727         ///     Notification that a target was linked to this source.\r
2728         /// </summary>\r
2729         /// <param name="_PTarget">\r
2730         ///     A pointer to the newly linked target.\r
2731         /// </param>\r
2732         virtual void link_target_notification(ITarget<_Output> *)\r
2733         {\r
2734             // If the message queue is blocked due to reservation\r
2735             // there is no need to do any message propagation\r
2736             if (_M_pReservedFor != NULL)\r
2737             {\r
2738                 return;\r
2739             }\r
2740 \r
2741             _Propagate_priority_order(_M_messageBuffer);\r
2742         }\r
2743 \r
2744         /// <summary>\r
2745         ///     Takes the message and propagates it to all the target of this join.\r
2746         ///     This is called from async_send.\r
2747         /// </summary>\r
2748         /// <param name="_PMessage">\r
2749         ///     The message being propagated\r
2750         /// </param>\r
2751         void propagate_to_any_targets(message<_Output> *) \r
2752         {\r
2753             // Attempt to create a new message\r
2754             message<_Output> * _Msg = _Create_new_message();\r
2755 \r
2756             if (_Msg != NULL)\r
2757             {\r
2758                 // Add the new message to the outbound queue\r
2759                 _M_messageBuffer.enqueue(_Msg);\r
2760 \r
2761                 if (!_M_messageBuffer.is_head(_Msg->msg_id()))\r
2762                 {\r
2763                     // another message is at the head of the outbound message queue and blocked\r
2764                     // simply return\r
2765                     return;\r
2766                 }\r
2767             }\r
2768 \r
2769             _Propagate_priority_order(_M_messageBuffer);\r
2770         }\r
2771 \r
2772     private:\r
2773 \r
2774         //\r
2775         //  Private Methods\r
2776         //\r
2777 \r
2778         /// <summary>\r
2779         ///     Propagate messages in priority order\r
2780         /// </summary>\r
2781         /// <param name="_MessageBuffer">\r
2782         ///     Reference to a message queue with messages to be propagated\r
2783         /// </param>\r
2784         void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)\r
2785         {\r
2786             message<_Output> * _Msg = _MessageBuffer.peek();\r
2787 \r
2788             // If someone has reserved the _Head message, don't propagate anymore\r
2789             if (_M_pReservedFor != NULL)\r
2790             {\r
2791                 return;\r
2792             }\r
2793 \r
2794             while (_Msg != NULL)\r
2795             {\r
2796                 message_status _Status = declined;\r
2797 \r
2798                 // Always start from the first target that linked\r
2799                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
2800                 {\r
2801                     ITarget<_Output> * _PTarget = *_Iter;\r
2802                     _Status = _PTarget->propagate(_Msg, this);\r
2803 \r
2804                     // Ownership of message changed. Do not propagate this\r
2805                     // message to any other target.\r
2806                     if (_Status == accepted)\r
2807                     {\r
2808                         break;\r
2809                     }\r
2810 \r
2811                     // If the target just propagated to reserved this message, stop\r
2812                     // propagating it to others\r
2813                     if (_M_pReservedFor != NULL)\r
2814                     {\r
2815                         break;\r
2816                     }\r
2817                 }\r
2818 \r
2819                 // If status is anything other than accepted, then the head message\r
2820                 // was not propagated out.  Thus, nothing after it in the queue can\r
2821                 // be propagated out.  Cease propagation.\r
2822                 if (_Status != accepted)\r
2823                 {\r
2824                     break;\r
2825                 }\r
2826 \r
2827                 // Get the next message\r
2828                 _Msg = _MessageBuffer.peek();\r
2829             }\r
2830         }\r
2831 \r
2832         /// <summary>\r
2833         ///     Create a new message from the data output\r
2834         /// </summary>\r
2835         /// <returns>\r
2836         ///     The created message (NULL if creation failed)\r
2837         /// </returns>\r
2838         message<_Output> * __cdecl _Create_new_message()\r
2839         {\r
2840             // If this is a non-greedy join, check each source and try to consume their message\r
2841             size_t _NumInputs = _M_savedMessageIdArray.size();\r
2842 \r
2843             // The iterator _Iter below will ensure that it is safe to touch\r
2844             // non-NULL source pointers. Take a snapshot.\r
2845             std::vector<ISource<_Input> *> _Sources;\r
2846             source_iterator _Iter = _M_connectedSources.begin();\r
2847 \r
2848             while (*_Iter != NULL)\r
2849             {\r
2850                 ISource<_Input> * _PSource = *_Iter;\r
2851 \r
2852                 if (_PSource == NULL)\r
2853                 {\r
2854                     break;\r
2855                 }\r
2856 \r
2857                 _Sources.push_back(_PSource);\r
2858                 ++_Iter;\r
2859             }\r
2860 \r
2861             if (_Sources.size() != _NumInputs)\r
2862             {\r
2863                 // Some of the sources were unlinked. The join is broken\r
2864                 return NULL;\r
2865             }\r
2866 \r
2867             // First, try and reserve all the messages.  If a reservation fails,\r
2868             // then release any reservations that had been made.\r
2869             for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)\r
2870             {\r
2871                 // Swap the id to -1 indicating that we have used that value for a recalculate\r
2872                 _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);\r
2873 \r
2874                 // If the id is -1, either we have never received a message on that link or the previous message is stored\r
2875                 // in the message array. If it is the former we abort. \r
2876                 // If the id is not -1, we attempt to reserve the message. On failure we abort.\r
2877                 if (((_M_savedIdBuffer[i] == -1) && (_M_messageArray[i] == NULL))\r
2878                     || ((_M_savedIdBuffer[i] != -1) && !_Sources[i]->reserve(_M_savedIdBuffer[i], this)))\r
2879                 {\r
2880                     // Abort. Release all reservations made up until this block, \r
2881                     // and wait for another message to arrive.\r
2882                     for (size_t j = 0; j < i; j++)\r
2883                     {\r
2884                         if (_M_savedIdBuffer[j] != -1)\r
2885                         {\r
2886                             _Sources[j]->release(_M_savedIdBuffer[j], this);\r
2887                             _InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1);\r
2888                         }\r
2889                     }\r
2890 \r
2891                     // Return NULL to indicate that the create failed\r
2892                     return NULL;\r
2893                 }  \r
2894             }\r
2895 \r
2896             // Since everything has been reserved, consume all the messages.\r
2897             // This is guaranteed to return true.\r
2898             size_t _NewMessages = 0;\r
2899             for (size_t i = 0; i < _NumInputs; i++)\r
2900             {\r
2901                 if (_M_savedIdBuffer[i] != -1)\r
2902                 {\r
2903                     // Delete previous message since we have a new one\r
2904                     if (_M_messageArray[i] != NULL)\r
2905                     {\r
2906                         delete _M_messageArray[i];\r
2907                     }\r
2908                     _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);\r
2909                     _M_savedIdBuffer[i] = -1;\r
2910                     _NewMessages++;\r
2911                 }\r
2912             }\r
2913 \r
2914             if (_NewMessages == 0)\r
2915             {\r
2916                 // There is no need to recal if we did not consume a new message\r
2917                 return NULL;\r
2918             }\r
2919 \r
2920             std::vector<_Input> _OutputVector;\r
2921             for (size_t i = 0; i < _NumInputs; i++)\r
2922             {\r
2923                 _ASSERTE(_M_messageArray[i] != NULL);\r
2924                 _OutputVector.push_back(_M_messageArray[i]->payload);\r
2925             }\r
2926 \r
2927             _Output _Out = _M_pFunc(_OutputVector);\r
2928             return (new message<_Output>(_Out));\r
2929         }\r
2930 \r
2931         /// <summary>\r
2932         ///     Initialize the recalculate block\r
2933         /// </summary>\r
2934         /// <param name="_NumInputs">\r
2935         ///     The number of inputs\r
2936         /// </param>\r
2937         void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)\r
2938         {\r
2939             initialize_source_and_target(_PScheduler, _PScheduleGroup);\r
2940 \r
2941             _M_connectedSources.set_bound(_NumInputs);\r
2942 \r
2943             // Non greedy joins need a buffer to snap off saved message ids to.\r
2944             _M_savedIdBuffer = new runtime_object_identity[_NumInputs];\r
2945             memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);\r
2946         }\r
2947 \r
2948         // An array containing the accepted messages of this join.\r
2949         std::vector<message<_Input>*> _M_messageArray;\r
2950 \r
2951         // An array containing the msg ids of messages propagated to the array\r
2952         // For non-greedy joins, this contains the message id of any message \r
2953         // passed to it.\r
2954         std::vector<runtime_object_identity> _M_savedMessageIdArray;\r
2955 \r
2956         // Buffer for snapping saved ids in non-greedy joins\r
2957         runtime_object_identity * _M_savedIdBuffer;\r
2958 \r
2959         // The transformer method called by this block\r
2960         _Recalculate_method _M_pFunc;\r
2961 \r
2962         // Queue to hold output messages\r
2963         MessageQueue<_Output> _M_messageBuffer;\r
2964     };\r
2965 \r
2966     //\r
2967     // Container class to hold a join_transform block and keep unbounded buffers in front of each input.\r
2968     //\r
2969     template<class _Input, class _Output>\r
2970     class buffered_join\r
2971     {\r
2972         typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;\r
2973 \r
2974     public:\r
2975         buffered_join(int _NumInputs, _Transform_method const& _Func): m_currentInput(0), m_numInputs(_NumInputs)\r
2976         {\r
2977             m_buffers = new unbounded_buffer<_Input>*[_NumInputs];\r
2978             m_join = new join_transform<_Input,_Output,greedy>(_NumInputs, _Func);\r
2979 \r
2980             for(int i = 0; i < _NumInputs; i++)\r
2981             {\r
2982                 m_buffers[i] = new unbounded_buffer<_Input>;\r
2983                 m_buffers[i]->link_target(m_join);\r
2984             }\r
2985         }\r
2986 \r
2987         ~buffered_join()\r
2988         {\r
2989             for(int i = 0; i < m_numInputs; i++)\r
2990                 delete m_buffers[i];\r
2991             delete [] m_buffers;\r
2992             delete m_join;\r
2993         }\r
2994 \r
2995         // Add input takes _PSource and connects it to the next input on this block\r
2996         void add_input(ISource<_Input> * _PSource)\r
2997         {\r
2998             _PSource->link_target(m_buffers[m_currentInput]);\r
2999             m_currentInput++;\r
3000         }\r
3001 \r
3002         // link_target links this container block to _PTarget\r
3003         void link_target(ITarget<_Output> * _PTarget)\r
3004         {\r
3005             m_join->link_target(_PTarget);\r
3006         }\r
3007     private:\r
3008 \r
3009         int m_currentInput;\r
3010         int m_numInputs;\r
3011         unbounded_buffer<_Input> ** m_buffers;\r
3012         join_transform<_Input,_Output,greedy> * m_join;\r
3013     };\r
3014 } // namespace Concurrency\r