]> git.sesse.net Git - casparcg/blob - concrt_extras/agents_extras.h
dependencies: Cleanup
[casparcg] / concrt_extras / agents_extras.h
1 //--------------------------------------------------------------------------\r
2 // \r
3 //  Copyright (c) Microsoft Corporation.  All rights reserved. \r
4 // \r
5 //  File: agents_extras.h\r
6 //\r
7 //  Implementation of various useful message blocks\r
8 //\r
9 //--------------------------------------------------------------------------\r
10 \r
11 #pragma once\r
12 \r
13 #include <agents.h>\r
14 \r
15 // bounded_buffer uses a map\r
16 #include <map>\r
17 #include <queue>\r
18 \r
19 namespace Concurrency\r
20 {\r
21     /// <summary>\r
22     ///     Simple queue class for storing messages.\r
23     /// </summary>\r
24     /// <typeparam name="_Type">\r
25     ///     The payload type of messages stored in this queue.\r
26     /// </typeparam>\r
27     template <class _Type>\r
28     class MessageQueue\r
29     {\r
30     public:\r
31         typedef message<_Type> _Message;\r
32 \r
33         /// <summary>\r
34         ///     Constructs an initially empty queue.\r
35         /// </summary>\r
36         MessageQueue()\r
37         {\r
38         }\r
39 \r
40         /// <summary>\r
41         ///     Removes and deletes any messages remaining in the queue.\r
42         /// </summary>\r
43         ~MessageQueue() \r
44         {\r
45             _Message * _Msg = dequeue();\r
46             while (_Msg != NULL)\r
47             {\r
48                 delete _Msg;\r
49                 _Msg = dequeue();\r
50             }\r
51         }\r
52 \r
53         /// <summary>\r
54         ///     Add an item to the queue.\r
55         /// </summary>\r
56         /// <param name="_Msg">\r
57         ///     Message to add.\r
58         /// </param>\r
59         void enqueue(_Message *_Msg)\r
60         {\r
61             _M_queue.push(_Msg);\r
62         }\r
63 \r
64         /// <summary>\r
65         ///     Dequeue an item from the head of queue.\r
66         /// </summary>\r
67         /// <returns>\r
68         ///     Returns a pointer to the message found at the head of the queue.\r
69         /// </returns>\r
70         _Message * dequeue()\r
71         {\r
72             _Message * _Msg = NULL;\r
73 \r
74             if (!_M_queue.empty())\r
75             {\r
76                 _Msg = _M_queue.front();\r
77                 _M_queue.pop();\r
78             }\r
79 \r
80             return _Msg;\r
81         }\r
82 \r
83         /// <summary>\r
84         ///     Return the item at the head of the queue, without dequeuing\r
85         /// </summary>\r
86         /// <returns>\r
87         ///     Returns a pointer to the message found at the head of the queue.\r
88         /// </returns>\r
89         _Message * peek() const\r
90         {\r
91             _Message * _Msg = NULL;\r
92 \r
93             if (!_M_queue.empty())\r
94             {\r
95                 _Msg = _M_queue.front();\r
96             }\r
97 \r
98             return _Msg;\r
99         }\r
100 \r
101         /// <summary>\r
102         ///     Returns the number of items currently in the queue.\r
103         /// </summary>\r
104         /// <returns>\r
105         /// Size of the queue.\r
106         /// </returns>\r
107         size_t count() const\r
108         {\r
109             return _M_queue.size();\r
110         }\r
111 \r
112         /// <summary>\r
113         ///     Checks to see if specified msg id is at the head of the queue.\r
114         /// </summary>\r
115         /// <param name="_MsgId">\r
116         ///     Message id to check for.\r
117         /// </param>\r
118         /// <returns>\r
119         ///     True if a message with specified id is at the head, false otherwise.\r
120         /// </returns>\r
121         bool is_head(const runtime_object_identity _MsgId) const\r
122         {\r
123             _Message * _Msg = peek();\r
124             if(_Msg != NULL)\r
125             {\r
126                 return _Msg->msg_id() == _MsgId;\r
127             }\r
128             return false;\r
129         }\r
130 \r
131     private:\r
132         \r
133         std::queue<_Message *> _M_queue;\r
134     };\r
135 \r
136     /// <summary>\r
137     ///     Simple queue implementation that takes into account priority\r
138     ///     using the comparison operator <.\r
139     /// </summary>\r
140     /// <typeparam name="_Type">\r
141     ///     The payload type of messages stored in this queue.\r
142     /// </typeparam>\r
143     template <class _Type>\r
144     class PriorityQueue\r
145     {\r
146     public:\r
147         /// <summary>\r
148         ///     Constructs an initially empty queue.\r
149         /// </summary>\r
150         PriorityQueue() : _M_pHead(NULL), _M_count(0) {}\r
151 \r
152         /// <summary>\r
153         ///     Removes and deletes any messages remaining in the queue.\r
154         /// </summary>\r
155         ~PriorityQueue() \r
156         {\r
157             message<_Type> * _Msg = dequeue();\r
158             while (_Msg != NULL)\r
159             {\r
160                 delete _Msg;\r
161                 _Msg = dequeue();\r
162             }\r
163         }\r
164 \r
165         /// <summary>\r
166         ///     Add an item to the queue, comparisons using the 'payload' field\r
167         ///     will determine the location in the queue.\r
168         /// </summary>\r
169         /// <param name="_Msg">\r
170         ///     Message to add.\r
171         /// </param>\r
172         /// <param name="fCanReplaceHead">\r
173         ///     True if this new message can be inserted at the head.\r
174         /// </param>\r
175         void enqueue(message<_Type> *_Msg, const bool fInsertAtHead = true)\r
176         {\r
177             MessageNode *_Element = new MessageNode();\r
178             _Element->_M_pMsg = _Msg;\r
179 \r
180             // Find location to insert.\r
181             MessageNode *pCurrent = _M_pHead;\r
182             MessageNode *pPrev = NULL;\r
183             if(!fInsertAtHead && pCurrent != NULL)\r
184             {\r
185                 pPrev = pCurrent;\r
186                 pCurrent = pCurrent->_M_pNext;\r
187             }\r
188             while(pCurrent != NULL)\r
189             {\r
190                 if(_Element->_M_pMsg->payload < pCurrent->_M_pMsg->payload)\r
191                 {\r
192                     break;\r
193                 }\r
194                 pPrev = pCurrent;\r
195                 pCurrent = pCurrent->_M_pNext;\r
196             }\r
197 \r
198             // Insert at head.\r
199             if(pPrev == NULL)\r
200             {\r
201                 _M_pHead = _Element;\r
202             }\r
203             else\r
204             {\r
205                 pPrev->_M_pNext = _Element;\r
206             }\r
207 \r
208             // Last item in queue.\r
209             if(pCurrent == NULL)\r
210             {\r
211                 _Element->_M_pNext = NULL;\r
212             }\r
213             else\r
214             {\r
215                 _Element->_M_pNext = pCurrent;\r
216             }\r
217 \r
218             ++_M_count;\r
219         }\r
220 \r
221         /// <summary>\r
222         ///     Dequeue an item from the head of queue.\r
223         /// </summary>\r
224         /// <returns>\r
225         ///     Returns a pointer to the message found at the head of the queue.\r
226         /// </returns>\r
227         message<_Type> * dequeue()\r
228         {\r
229             if (_M_pHead == NULL) \r
230             {\r
231                 return NULL;\r
232             }\r
233 \r
234             MessageNode *_OldHead = _M_pHead;\r
235             message<_Type> * _Result = _OldHead->_M_pMsg;\r
236 \r
237             _M_pHead = _OldHead->_M_pNext;\r
238 \r
239             delete _OldHead;\r
240 \r
241             if(--_M_count == 0)\r
242             {\r
243                 _M_pHead = NULL;\r
244             }\r
245             return _Result;\r
246         }\r
247 \r
248         /// <summary>\r
249         ///     Return the item at the head of the queue, without dequeuing\r
250         /// </summary>\r
251         /// <returns>\r
252         ///     Returns a pointer to the message found at the head of the queue.\r
253         /// </returns>\r
254         message<_Type> * peek() const\r
255         {\r
256             if(_M_count != 0)\r
257             {\r
258                 return _M_pHead->_M_pMsg;\r
259             }\r
260             return NULL;\r
261         }\r
262 \r
263         /// <summary>\r
264         ///     Returns the number of items currently in the queue.\r
265         /// </summary>\r
266         /// <returns>\r
267         ///     Size of the queue.\r
268         /// </returns>\r
269         size_t count() const\r
270         {\r
271             return _M_count;\r
272         }\r
273 \r
274         /// <summary>\r
275         ///     Checks to see if specified msg id is at the head of the queue.\r
276         /// </summary>\r
277         /// <param name="_MsgId">\r
278         ///     Message id to check for.\r
279         /// </param>\r
280         /// <returns>\r
281         ///     True if a message with specified id is at the head, false otherwise.\r
282         /// </returns>\r
283         bool is_head(const runtime_object_identity _MsgId) const\r
284         {\r
285             if(_M_count != 0)\r
286             {\r
287                 return _M_pHead->_M_pMsg->msg_id() == _MsgId;\r
288             }\r
289             return false;\r
290         }\r
291 \r
292     private:\r
293         \r
294         // Used to store individual message nodes.\r
295         struct MessageNode\r
296         {\r
297             MessageNode() : _M_pMsg(NULL), _M_pNext(NULL) {}\r
298             message<_Type> * _M_pMsg;\r
299             MessageNode * _M_pNext;\r
300         };\r
301 \r
302         // A pointer to the head of the queue.\r
303         MessageNode * _M_pHead;\r
304 \r
305         // The number of elements presently stored in the queue.\r
306         size_t _M_count;\r
307     };\r
308         \r
309     /// <summary>\r
310     ///        priority_buffer is a buffer that uses a comparison operator on the 'payload' of each message to determine\r
311     ///     order when offering to targets. Besides this it acts exactly like an unbounded_buffer.\r
312     /// </summary>\r
313     /// <typeparam name="_Type">\r
314     ///     The payload type of messages stored and propagated by the buffer.\r
315     /// </typeparam>\r
316     template<class _Type>\r
317     class priority_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
318     {\r
319     public:\r
320 \r
321         /// <summary>\r
322         ///     Creates an priority_buffer within the default scheduler, and places it any schedule\r
323         ///     group of the scheduler\92s choosing.\r
324         /// </summary>\r
325         priority_buffer() \r
326         {\r
327             initialize_source_and_target();\r
328         }\r
329 \r
330         /// <summary>\r
331         ///     Creates an priority_buffer within the default scheduler, and places it any schedule\r
332         ///     group of the scheduler\92s choosing.\r
333         /// </summary>\r
334         /// <param name="_Filter">\r
335         ///     A reference to a filter function.\r
336         /// </param>\r
337         priority_buffer(filter_method const& _Filter)\r
338         {\r
339             initialize_source_and_target();\r
340             register_filter(_Filter);\r
341         }\r
342 \r
343         /// <summary>\r
344         ///     Creates an priority_buffer within the specified scheduler, and places it any schedule\r
345         ///     group of the scheduler\92s choosing.\r
346         /// </summary>\r
347         /// <param name="_PScheduler">\r
348         ///     A reference to a scheduler instance.\r
349         /// </param>\r
350         priority_buffer(Scheduler& _PScheduler)\r
351         {\r
352             initialize_source_and_target(&_PScheduler);\r
353         }\r
354 \r
355         /// <summary>\r
356         ///     Creates an priority_buffer within the specified scheduler, and places it any schedule\r
357         ///     group of the scheduler\92s choosing.\r
358         /// </summary>\r
359         /// <param name="_PScheduler">\r
360         ///     A reference to a scheduler instance.\r
361         /// </param>\r
362         /// <param name="_Filter">\r
363         ///     A reference to a filter function.\r
364         /// </param>\r
365         priority_buffer(Scheduler& _PScheduler, filter_method const& _Filter) \r
366         {\r
367             initialize_source_and_target(&_PScheduler);\r
368             register_filter(_Filter);\r
369         }\r
370 \r
371         /// <summary>\r
372         ///     Creates an priority_buffer within the specified schedule group.  The scheduler is implied\r
373         ///     by the schedule group.\r
374         /// </summary>\r
375         /// <param name="_PScheduleGroup">\r
376         ///     A reference to a schedule group.\r
377         /// </param>\r
378         priority_buffer(ScheduleGroup& _PScheduleGroup)\r
379         {\r
380             initialize_source_and_target(NULL, &_PScheduleGroup);\r
381         }\r
382 \r
383         /// <summary>\r
384         ///     Creates an priority_buffer within the specified schedule group.  The scheduler is implied\r
385         ///     by the schedule group.\r
386         /// </summary>\r
387         /// <param name="_PScheduleGroup">\r
388         ///     A reference to a schedule group.\r
389         /// </param>\r
390         /// <param name="_Filter">\r
391         ///     A reference to a filter function.\r
392         /// </param>\r
393         priority_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
394         {\r
395             initialize_source_and_target(NULL, &_PScheduleGroup);\r
396             register_filter(_Filter);\r
397         }\r
398 \r
399         /// <summary>\r
400         ///     Cleans up any resources that may have been created by the priority_buffer.\r
401         /// </summary>\r
402         ~priority_buffer()\r
403         {\r
404             // Remove all links\r
405             remove_network_links();\r
406         }\r
407 \r
408         /// <summary>\r
409         ///     Add an item to the priority_buffer\r
410         /// </summary>\r
411         /// <param name="_Item">\r
412         ///     A reference to the item to add.\r
413         /// </param>\r
414         /// <returns>\r
415         ///     A boolean indicating whether the data was accepted.\r
416         /// </returns>\r
417         bool enqueue(_Type const& _Item)\r
418         {\r
419             return Concurrency::send<_Type>(this, _Item);\r
420         }\r
421 \r
422         /// <summary>\r
423         ///     Remove an item from the priority_buffer\r
424         /// </summary>\r
425         /// <returns>\r
426         ///     The message payload.\r
427         /// </returns>\r
428         _Type dequeue()\r
429         {\r
430             return receive<_Type>(this);\r
431         }\r
432 \r
433     protected:\r
434 \r
435         /// <summary>\r
436         ///     The main propagate() function for ITarget blocks.  Called by a source\r
437         ///     block, generally within an asynchronous task to send messages to its targets.\r
438         /// </summary>\r
439         /// <param name="_PMessage">\r
440         ///     A pointer to the message.\r
441         /// </param>\r
442         /// <param name="_PSource">\r
443         ///     A pointer to the source block offering the message.\r
444         /// </param>\r
445         /// <returns>\r
446         ///     An indication of what the target decided to do with the message.\r
447         /// </returns>\r
448         /// <remarks>\r
449         ///     It is important that calls to propagate do *not* take the same lock on the\r
450         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
451         ///     result in a deadlock with the Consume call.  (in the case of the priority_buffer,\r
452         ///     this lock is the m_internalLock)\r
453         /// </remarks>\r
454         virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
455         {\r
456             message_status _Result = accepted;\r
457             //\r
458             // Accept the message being propagated\r
459             // Note: depending on the source block propagating the message\r
460             // this may not necessarily be the same message (pMessage) first\r
461             // passed into the function.\r
462             //\r
463             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
464 \r
465             if (_PMessage != NULL)\r
466             {\r
467                 async_send(_PMessage);\r
468             }\r
469             else\r
470             {\r
471                 _Result = missed;\r
472             }\r
473 \r
474             return _Result;\r
475         }\r
476 \r
477         /// <summary>\r
478         ///     Synchronously sends a message to this block.  When this function completes the message will\r
479         ///     already have propagated into the block.\r
480         /// </summary>\r
481         /// <param name="_PMessage">\r
482         ///     A pointer to the message.\r
483         /// </param>\r
484         /// <param name="_PSource">\r
485         ///     A pointer to the source block offering the message.\r
486         /// </param>\r
487         /// <returns>\r
488         ///     An indication of what the target decided to do with the message.\r
489         /// </returns>\r
490         virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
491         {\r
492             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
493 \r
494             if (_PMessage != NULL)\r
495             {\r
496                 sync_send(_PMessage);\r
497             }\r
498             else\r
499             {\r
500                 return missed;\r
501             }\r
502 \r
503             return accepted;\r
504         }\r
505 \r
506         /// <summary>\r
507         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
508         /// </summary>\r
509         /// <param name="_MsgId">\r
510         ///     The runtime object identity of the message.\r
511         /// </param>\r
512         /// <returns>\r
513         ///     A pointer to the message that the caller now has ownership of.\r
514         /// </returns>\r
515         virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
516         {\r
517             //\r
518             // Peek at the head message in the message buffer.  If the Ids match\r
519             // dequeue and transfer ownership\r
520             //\r
521             message<_Type> * _Msg = NULL;\r
522 \r
523             if (_M_messageBuffer.is_head(_MsgId))\r
524             {\r
525                 _Msg = _M_messageBuffer.dequeue();\r
526             }\r
527 \r
528             return _Msg;\r
529         }\r
530 \r
531         /// <summary>\r
532         ///     Reserves a message previously offered by the source.\r
533         /// </summary>\r
534         /// <param name="_MsgId">\r
535         ///     The runtime object identity of the message.\r
536         /// </param>\r
537         /// <returns>\r
538         ///     A Boolean indicating whether the reservation worked or not.\r
539         /// </returns>\r
540         /// <remarks>\r
541         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
542         /// </remarks>\r
543         virtual bool reserve_message(runtime_object_identity _MsgId)\r
544         {\r
545             // Allow reservation if this is the head message\r
546             return _M_messageBuffer.is_head(_MsgId);\r
547         }\r
548 \r
549         /// <summary>\r
550         ///     Consumes a message that was reserved previously.\r
551         /// </summary>\r
552         /// <param name="_MsgId">\r
553         ///     The runtime object identity of the message.\r
554         /// </param>\r
555         /// <returns>\r
556         ///     A pointer to the message that the caller now has ownership of.\r
557         /// </returns>\r
558         /// <remarks>\r
559         ///     Similar to 'accept', but is always preceded by a call to 'reserve'.\r
560         /// </remarks>\r
561         virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
562         {\r
563             // By default, accept the message\r
564             return accept_message(_MsgId);\r
565         }\r
566 \r
567         /// <summary>\r
568         ///     Releases a previous message reservation.\r
569         /// </summary>\r
570         /// <param name="_MsgId">\r
571         ///     The runtime object identity of the message.\r
572         /// </param>\r
573         virtual void release_message(runtime_object_identity _MsgId)\r
574         {\r
575             // The head message is the one reserved.\r
576             if (!_M_messageBuffer.is_head(_MsgId))\r
577             {\r
578                 throw message_not_found();\r
579             }\r
580         }\r
581 \r
582         /// <summary>\r
583         ///    Resumes propagation after a reservation has been released\r
584         /// </summary>\r
585         virtual void resume_propagation()\r
586         {\r
587             // If there are any messages in the buffer, propagate them out\r
588             if (_M_messageBuffer.count() > 0)\r
589             {\r
590                 async_send(NULL);\r
591             }\r
592         }\r
593 \r
594         /// <summary>\r
595         ///     Notification that a target was linked to this source.\r
596         /// </summary>\r
597         /// <param name="_PTarget">\r
598         ///     A pointer to the newly linked target.\r
599         /// </param>\r
600         virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
601         {\r
602             // If the message queue is blocked due to reservation\r
603             // there is no need to do any message propagation\r
604             if (_M_pReservedFor != NULL)\r
605             {\r
606                 return;\r
607             }\r
608 \r
609             message<_Type> * _Msg = _M_messageBuffer.peek();\r
610 \r
611             if (_Msg != NULL)\r
612             {\r
613                 // Propagate the head message to the new target\r
614                 message_status _Status = _PTarget->propagate(_Msg, this);\r
615 \r
616                 if (_Status == accepted)\r
617                 {\r
618                     // The target accepted the message, restart propagation.\r
619                     propagate_to_any_targets(NULL);\r
620                 }\r
621 \r
622                 // If the status is anything other than accepted, then leave\r
623                 // the message queue blocked.\r
624             }\r
625         }\r
626 \r
627         /// <summary>\r
628         /// Takes the message and propagates it to all the targets of this priority_buffer.\r
629         /// </summary>\r
630         /// <param name="_PMessage">\r
631         ///     A pointer to a new message.\r
632         /// </param>\r
633         virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
634         {\r
635             // Enqueue pMessage to the internal unbounded buffer queue if it is non-NULL.\r
636             // _PMessage can be NULL if this LWT was the result of a Repropagate call\r
637             // out of a Consume or Release (where no new message is queued up, but\r
638             // everything remaining in the priority_buffer needs to be propagated out)\r
639             if (_PMessage != NULL)\r
640             {\r
641                 message<_Type> *pPrevHead = _M_messageBuffer.peek();\r
642 \r
643                 // If a reservation is held make sure to not insert this new\r
644                 // message before it.\r
645                 if(_M_pReservedFor != NULL)\r
646                 {\r
647                     _M_messageBuffer.enqueue(_PMessage, false);\r
648                 }\r
649                 else\r
650                 {\r
651                     _M_messageBuffer.enqueue(_PMessage);\r
652                 }\r
653 \r
654                 // If the head message didn't change, we can safely assume that\r
655                 // the head message is blocked and waiting on Consume(), Release() or a new\r
656                 // link_target()\r
657                 if (pPrevHead != NULL && !_M_messageBuffer.is_head(pPrevHead->msg_id()))\r
658                 {\r
659                     return;\r
660                 }\r
661             }\r
662 \r
663             // Attempt to propagate messages to all the targets\r
664             _Propagate_priority_order();\r
665         }\r
666 \r
667     private:\r
668 \r
669         /// <summary>\r
670         ///     Attempts to propagate out any messages currently in the block.\r
671         /// </summary>\r
672         void _Propagate_priority_order()\r
673         {\r
674             message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
675 \r
676             // If someone has reserved the _Head message, don't propagate anymore\r
677             if (_M_pReservedFor != NULL)\r
678             {\r
679                 return;\r
680             }\r
681 \r
682             while (_Msg != NULL)\r
683             {\r
684                 message_status _Status = declined;\r
685 \r
686                 // Always start from the first target that linked.\r
687                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
688                 {\r
689                     ITarget<_Target_type> * _PTarget = *_Iter;\r
690                     _Status = _PTarget->propagate(_Msg, this);\r
691 \r
692                     // Ownership of message changed. Do not propagate this\r
693                     // message to any other target.\r
694                     if (_Status == accepted)\r
695                     {\r
696                         break;\r
697                     }\r
698 \r
699                     // If the target just propagated to reserved this message, stop\r
700                     // propagating it to others.\r
701                     if (_M_pReservedFor != NULL)\r
702                     {\r
703                         break;\r
704                     }\r
705                 }\r
706 \r
707                 // If status is anything other than accepted, then the head message\r
708                 // was not propagated out.  Thus, nothing after it in the queue can\r
709                 // be propagated out.  Cease propagation.\r
710                 if (_Status != accepted)\r
711                 {\r
712                     break;\r
713                 }\r
714 \r
715                 // Get the next message\r
716                 _Msg = _M_messageBuffer.peek();\r
717             }\r
718         }\r
719 \r
720         /// <summary>\r
721         ///     Priority Queue used to store messages.\r
722         /// </summary>\r
723         PriorityQueue<_Type> _M_messageBuffer;\r
724 \r
725         //\r
726         // Hide assignment operator and copy constructor.\r
727         //\r
728         priority_buffer const &operator =(priority_buffer const&);  // no assignment operator\r
729         priority_buffer(priority_buffer const &);                   // no copy constructor\r
730     };\r
731 \r
732 \r
733     /// <summary>\r
734     ///    A bounded_buffer implementation. Once the capacity is reached it will save the offered message\r
735     ///    id and postpone. Once below capacity again the bounded_buffer will try to reserve and consume\r
736     ///    any of the postponed messages. Preference is given to previously offered messages before new ones.\r
737     ///\r
738     ///    NOTE: this bounded_buffer implementation contains code that is very unique to this particular block. \r
739     ///          Extreme caution should be taken if code is directly copy and pasted from this class. The bounded_buffer\r
740     ///          implementation uses a critical_section, several interlocked operations, and additional calls to async_send.\r
741     ///          These are needed to not abandon a previously saved message id. Most blocks never have to deal with this problem.\r
742     /// </summary>\r
743     /// <typeparam name="_Type">\r
744     ///     The payload type of messages stored and propagated by the buffer.\r
745     /// </typeparam>\r
746     template<class _Type>\r
747     class bounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
748     {\r
749     public:\r
750         /// <summary>\r
751         ///     Creates an bounded_buffer within the default scheduler, and places it any schedule\r
752         ///     group of the scheduler\92s choosing.\r
753         /// </summary>\r
754         bounded_buffer(const size_t capacity)\r
755             : _M_capacity(capacity), _M_currentSize(0)\r
756         {\r
757             initialize_source_and_target();\r
758         }\r
759 \r
760         /// <summary>\r
761         ///     Creates an bounded_buffer within the default scheduler, and places it any schedule\r
762         ///     group of the scheduler\92s choosing.\r
763         /// </summary>\r
764         /// <param name="_Filter">\r
765         ///     A reference to a filter function.\r
766         /// </param>\r
767         bounded_buffer(const size_t capacity, filter_method const& _Filter)\r
768             : _M_capacity(capacity), _M_currentSize(0)\r
769         {\r
770             initialize_source_and_target();\r
771             register_filter(_Filter);\r
772         }\r
773 \r
774         /// <summary>\r
775         ///     Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
776         ///     group of the scheduler\92s choosing.\r
777         /// </summary>\r
778         /// <param name="_PScheduler">\r
779         ///     A reference to a scheduler instance.\r
780         /// </param>\r
781         bounded_buffer(const size_t capacity, Scheduler& _PScheduler)\r
782             : _M_capacity(capacity), _M_currentSize(0)\r
783         {\r
784             initialize_source_and_target(&_PScheduler);\r
785         }\r
786 \r
787         /// <summary>\r
788         ///     Creates an bounded_buffer within the specified scheduler, and places it any schedule\r
789         ///     group of the scheduler\92s choosing.\r
790         /// </summary>\r
791         /// <param name="_PScheduler">\r
792         ///     A reference to a scheduler instance.\r
793         /// </param>\r
794         /// <param name="_Filter">\r
795         ///     A reference to a filter function.\r
796         /// </param>\r
797         bounded_buffer(const size_t capacity, Scheduler& _PScheduler, filter_method const& _Filter) \r
798             : _M_capacity(capacity), _M_currentSize(0)\r
799         {\r
800             initialize_source_and_target(&_PScheduler);\r
801             register_filter(_Filter);\r
802         }\r
803 \r
804         /// <summary>\r
805         ///     Creates an bounded_buffer within the specified schedule group.  The scheduler is implied\r
806         ///     by the schedule group.\r
807         /// </summary>\r
808         /// <param name="_PScheduleGroup">\r
809         ///     A reference to a schedule group.\r
810         /// </param>\r
811         bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup)\r
812             : _M_capacity(capacity), _M_currentSize(0)\r
813         {\r
814             initialize_source_and_target(NULL, &_PScheduleGroup);\r
815         }\r
816 \r
817         /// <summary>\r
818         ///     Creates an bounded_buffer within the specified schedule group.  The scheduler is implied\r
819         ///     by the schedule group.\r
820         /// </summary>\r
821         /// <param name="_PScheduleGroup">\r
822         ///     A reference to a schedule group.\r
823         /// </param>\r
824         /// <param name="_Filter">\r
825         ///     A reference to a filter function.\r
826         /// </param>\r
827         bounded_buffer(const size_t capacity, ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
828             : _M_capacity(capacity), _M_currentSize(0)\r
829         {\r
830             initialize_source_and_target(NULL, &_PScheduleGroup);\r
831             register_filter(_Filter);\r
832         }\r
833 \r
834         /// <summary>\r
835         ///     Cleans up any resources that may have been used by the bounded_buffer.\r
836         /// </summary>\r
837         ~bounded_buffer()\r
838         {\r
839             // Remove all links\r
840             remove_network_links();\r
841         }\r
842 \r
843         /// <summary>\r
844         ///     Add an item to the bounded_buffer.\r
845         /// </summary>\r
846         /// <param name="_Item">\r
847         ///     A reference to the item to add.\r
848         /// </param>\r
849         /// <returns>\r
850         ///     A boolean indicating whether the data was accepted.\r
851         /// </returns>\r
852         bool enqueue(_Type const& _Item)\r
853         {\r
854             return Concurrency::send<_Type>(this, _Item);\r
855         }\r
856 \r
857         /// <summary>\r
858         ///     Remove an item from the bounded_buffer.\r
859         /// </summary>\r
860         /// <returns>\r
861         ///     The message payload.\r
862         /// </returns>\r
863         _Type dequeue()\r
864         {\r
865             return receive<_Type>(this);\r
866         }\r
867 \r
868     protected:\r
869 \r
870         /// <summary>\r
871         ///     The main propagate() function for ITarget blocks.  Called by a source\r
872         ///     block, generally within an asynchronous task to send messages to its targets.\r
873         /// </summary>\r
874         /// <param name="_PMessage">\r
875         ///     A pointer to the message.\r
876         /// </param>\r
877         /// <param name="_PSource">\r
878         ///     A pointer to the source block offering the message.\r
879         /// </param>\r
880         /// <returns>\r
881         ///     An indication of what the target decided to do with the message.\r
882         /// </returns>\r
883         /// <remarks>\r
884         ///     It is important that calls to propagate do *not* take the same lock on the\r
885         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
886         ///     result in a deadlock with the Consume call.  (in the case of the bounded_buffer,\r
887         ///     this lock is the m_internalLock)\r
888         /// </remarks>\r
889         virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
890         {\r
891             message_status _Result = accepted;\r
892             \r
893             // Check current capacity. \r
894             if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
895             {\r
896                 // Postpone the message, buffer is full.\r
897                 _InterlockedDecrement(&_M_currentSize);\r
898                 _Result = postponed;\r
899 \r
900                 // Save off the message id from this source to later try\r
901                 // and reserve/consume when more space is free.\r
902                 {\r
903                     critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
904                     _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
905                 }\r
906 \r
907                 async_send(NULL);\r
908             }\r
909             else\r
910             {\r
911                 //\r
912                 // Accept the message being propagated\r
913                 // Note: depending on the source block propagating the message\r
914                 // this may not necessarily be the same message (pMessage) first\r
915                 // passed into the function.\r
916                 //\r
917                 _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
918 \r
919                 if (_PMessage != NULL)\r
920                 {\r
921                     async_send(_PMessage);\r
922                 }\r
923                 else\r
924                 {\r
925                     // Didn't get a message so need to decrement.\r
926                     _InterlockedDecrement(&_M_currentSize);\r
927                     _Result = missed;\r
928                     async_send(NULL);\r
929                 }\r
930             }\r
931 \r
932             return _Result;\r
933         }\r
934 \r
935         /// <summary>\r
936         ///     Synchronously sends a message to this block.  When this function completes the message will\r
937         ///     already have propagated into the block.\r
938         /// </summary>\r
939         /// <param name="_PMessage">\r
940         ///     A pointer to the message.\r
941         /// </param>\r
942         /// <param name="_PSource">\r
943         ///     A pointer to the source block offering the message.\r
944         /// </param>\r
945         /// <returns>\r
946         ///     An indication of what the target decided to do with the message.\r
947         /// </returns>\r
948         virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
949         {\r
950             message_status _Result = accepted;\r
951             \r
952             // Check current capacity. \r
953             if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
954             {\r
955                 // Postpone the message, buffer is full.\r
956                 _InterlockedDecrement(&_M_currentSize);\r
957                 _Result = postponed;\r
958 \r
959                 // Save off the message id from this source to later try\r
960                 // and reserve/consume when more space is free.\r
961                 {\r
962                     critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
963                     _M_savedSourceMsgIds[_PSource] = _PMessage->msg_id();\r
964                 }\r
965 \r
966                 async_send(NULL);\r
967             }\r
968             else\r
969             {\r
970                 //\r
971                 // Accept the message being propagated\r
972                 // Note: depending on the source block propagating the message\r
973                 // this may not necessarily be the same message (pMessage) first\r
974                 // passed into the function.\r
975                 //\r
976                 _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
977 \r
978                 if (_PMessage != NULL)\r
979                 {\r
980                     async_send(_PMessage);\r
981                 }\r
982                 else\r
983                 {\r
984                     // Didn't get a message so need to decrement.\r
985                     _InterlockedDecrement(&_M_currentSize);\r
986                     _Result = missed;\r
987                     async_send(NULL);\r
988                 }\r
989             }\r
990 \r
991             return _Result;\r
992         }\r
993 \r
994         /// <summary>\r
995         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
996         /// </summary>\r
997         /// <param name="_MsgId">\r
998         ///     The runtime object identity of the message.\r
999         /// </param>\r
1000         /// <returns>\r
1001         ///     A pointer to the message that the caller now has ownership of.\r
1002         /// </returns>\r
1003         virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
1004         {\r
1005             //\r
1006             // Peek at the head message in the message buffer.  If the Ids match\r
1007             // dequeue and transfer ownership\r
1008             //\r
1009             message<_Type> * _Msg = NULL;\r
1010 \r
1011             if (_M_messageBuffer.is_head(_MsgId))\r
1012             {\r
1013                 _Msg = _M_messageBuffer.dequeue();\r
1014 \r
1015                 // Give preference to any previously postponed messages\r
1016                 // before decrementing current size.\r
1017                 if(!try_consume_msg())\r
1018                 {\r
1019                     _InterlockedDecrement(&_M_currentSize);\r
1020                 }\r
1021             }\r
1022 \r
1023             return _Msg;\r
1024         }\r
1025 \r
1026         /// <summary>\r
1027         ///     Try to reserve and consume a message from list of saved message ids.\r
1028         /// </summary>\r
1029         /// <returns>\r
1030         ///     True if a message was sucessfully consumed, false otherwise.\r
1031         /// </returns>\r
1032         bool try_consume_msg()\r
1033         {\r
1034             runtime_object_identity _ReservedId = -1;\r
1035             ISource<_Type> * _PSource = NULL;\r
1036 \r
1037             // Walk through source links seeing if any saved ids exist.\r
1038             bool _ConsumedMsg = true;\r
1039             while(_ConsumedMsg)\r
1040             {\r
1041                 source_iterator _Iter = _M_connectedSources.begin();\r
1042                 {\r
1043                     critical_section::scoped_lock scopedLock(_M_savedIdsLock);\r
1044                     for (; *_Iter != NULL; ++_Iter)\r
1045                     {\r
1046                         _PSource = *_Iter;\r
1047                         std::map<ISource<_Type> *, runtime_object_identity>::iterator _MapIter;\r
1048                         if((_MapIter = _M_savedSourceMsgIds.find(_PSource)) != _M_savedSourceMsgIds.end())\r
1049                         {\r
1050                             _ReservedId = _MapIter->second;\r
1051                             _M_savedSourceMsgIds.erase(_MapIter);\r
1052                             break;\r
1053                         }\r
1054                     }\r
1055                 }\r
1056 \r
1057                 // Can't call into source block holding _M_savedIdsLock, that would be a recipe for disaster.\r
1058                 if(_ReservedId != -1)\r
1059                 {\r
1060                     if(_PSource->reserve(_ReservedId, this))\r
1061                     {\r
1062                         message<_Type> * _ConsumedMsg = _PSource->consume(_ReservedId, this);\r
1063                         async_send(_ConsumedMsg);\r
1064                         return true;\r
1065                     }\r
1066                     // Reserve failed go or link was removed, \r
1067                     // go back and try and find a different msg id.\r
1068                     else\r
1069                     {\r
1070                         continue;\r
1071                     }\r
1072                 }\r
1073 \r
1074                 // If this point is reached the map of source ids was empty.\r
1075                 break;\r
1076             }\r
1077 \r
1078             return false;\r
1079         }\r
1080 \r
1081         /// <summary>\r
1082         ///     Reserves a message previously offered by the source.\r
1083         /// </summary>\r
1084         /// <param name="_MsgId">\r
1085         ///     The runtime object identity of the message.\r
1086         /// </param>\r
1087         /// <returns>\r
1088         ///     A Boolean indicating whether the reservation worked or not.\r
1089         /// </returns>\r
1090         /// <remarks>\r
1091         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
1092         /// </remarks>\r
1093         virtual bool reserve_message(runtime_object_identity _MsgId)\r
1094         {\r
1095             // Allow reservation if this is the head message\r
1096             return _M_messageBuffer.is_head(_MsgId);\r
1097         }\r
1098 \r
1099         /// <summary>\r
1100         ///     Consumes a message that was reserved previously.\r
1101         /// </summary>\r
1102         /// <param name="_MsgId">\r
1103         ///     The runtime object identity of the message.\r
1104         /// </param>\r
1105         /// <returns>\r
1106         ///     A pointer to the message that the caller now has ownership of.\r
1107         /// </returns>\r
1108         /// <remarks>\r
1109         ///     Similar to 'accept', but is always preceded by a call to 'reserve'.\r
1110         /// </remarks>\r
1111         virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
1112         {\r
1113             // By default, accept the message\r
1114             return accept_message(_MsgId);\r
1115         }\r
1116 \r
1117         /// <summary>\r
1118         ///     Releases a previous message reservation.\r
1119         /// </summary>\r
1120         /// <param name="_MsgId">\r
1121         ///     The runtime object identity of the message.\r
1122         /// </param>\r
1123         virtual void release_message(runtime_object_identity _MsgId)\r
1124         {\r
1125             // The head message is the one reserved.\r
1126             if (!_M_messageBuffer.is_head(_MsgId))\r
1127             {\r
1128                 throw message_not_found();\r
1129             }\r
1130         }\r
1131 \r
1132         /// <summary>\r
1133         ///    Resumes propagation after a reservation has been released\r
1134         /// </summary>\r
1135         virtual void resume_propagation()\r
1136         {\r
1137             // If there are any messages in the buffer, propagate them out\r
1138             if (_M_messageBuffer.count() > 0)\r
1139             {\r
1140                 async_send(NULL);\r
1141             }\r
1142         }\r
1143 \r
1144         /// <summary>\r
1145         ///     Notification that a target was linked to this source.\r
1146         /// </summary>\r
1147         /// <param name="_PTarget">\r
1148         ///     A pointer to the newly linked target.\r
1149         /// </param>\r
1150         virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
1151         {\r
1152             // If the message queue is blocked due to reservation\r
1153             // there is no need to do any message propagation\r
1154             if (_M_pReservedFor != NULL)\r
1155             {\r
1156                 return;\r
1157             }\r
1158 \r
1159             message<_Type> * _Msg = _M_messageBuffer.peek();\r
1160 \r
1161             if (_Msg != NULL)\r
1162             {\r
1163                 // Propagate the head message to the new target\r
1164                 message_status _Status = _PTarget->propagate(_Msg, this);\r
1165 \r
1166                 if (_Status == accepted)\r
1167                 {\r
1168                     // The target accepted the message, restart propagation.\r
1169                     propagate_to_any_targets(NULL);\r
1170                 }\r
1171 \r
1172                 // If the status is anything other than accepted, then leave\r
1173                 // the message queue blocked.\r
1174             }\r
1175         }\r
1176 \r
1177         /// <summary>\r
1178         ///     Takes the message and propagates it to all the targets of this bounded_buffer.\r
1179         ///     This is called from async_send.\r
1180         /// </summary>\r
1181         /// <param name="_PMessage">\r
1182         ///     A pointer to a new message.\r
1183         /// </param>\r
1184                 virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
1185         {\r
1186             // Enqueue pMessage to the internal message buffer if it is non-NULL.\r
1187             // pMessage can be NULL if this LWT was the result of a Repropagate call\r
1188            // out of a Consume or Release (where no new message is queued up, but\r
1189             // everything remaining in the bounded buffer needs to be propagated out)\r
1190             if (_PMessage != NULL)\r
1191             {\r
1192                 _M_messageBuffer.enqueue(_PMessage);\r
1193 \r
1194                 // If the incoming pMessage is not the head message, we can safely assume that\r
1195                 // the head message is blocked and waiting on Consume(), Release() or a new\r
1196                 // link_target() and cannot be propagated out.\r
1197                 if (!_M_messageBuffer.is_head(_PMessage->msg_id()))\r
1198                 {\r
1199                     return;\r
1200                 }\r
1201             }\r
1202 \r
1203             _Propagate_priority_order();\r
1204             \r
1205             {\r
1206 \r
1207                 // While current size is less than capacity try to consume\r
1208                 // any previously offered ids.\r
1209                 bool _ConsumedMsg = true;\r
1210                 while(_ConsumedMsg)\r
1211                 {\r
1212                     // Assume a message will be found to successfully consume in the\r
1213                     // saved ids, if not this will be decremented afterwards.\r
1214                     if((size_t)_InterlockedIncrement(&_M_currentSize) > _M_capacity)\r
1215                     {\r
1216                         break;\r
1217                     }\r
1218 \r
1219                     _ConsumedMsg = try_consume_msg();\r
1220                 }\r
1221 \r
1222                 // Decrement the current size, we broke out of the previous loop\r
1223                 // because we reached capacity or there were no more messages to consume.\r
1224                 _InterlockedDecrement(&_M_currentSize);\r
1225             }\r
1226         }\r
1227 \r
1228     private:\r
1229 \r
1230         /// <summary>\r
1231         ///     Attempts to propagate out any messages currently in the block.\r
1232         /// </summary>\r
1233         void _Propagate_priority_order()\r
1234         {\r
1235             message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
1236 \r
1237             // If someone has reserved the _Head message, don't propagate anymore\r
1238             if (_M_pReservedFor != NULL)\r
1239             {\r
1240                 return;\r
1241             }\r
1242 \r
1243             while (_Msg != NULL)\r
1244             {\r
1245                 message_status _Status = declined;\r
1246 \r
1247                 // Always start from the first target that linked.\r
1248                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
1249                 {\r
1250                     ITarget<_Target_type> * _PTarget = *_Iter;\r
1251                     _Status = _PTarget->propagate(_Msg, this);\r
1252 \r
1253                     // Ownership of message changed. Do not propagate this\r
1254                     // message to any other target.\r
1255                     if (_Status == accepted)\r
1256                     {\r
1257                         break;\r
1258                     }\r
1259 \r
1260                     // If the target just propagated to reserved this message, stop\r
1261                     // propagating it to others.\r
1262                     if (_M_pReservedFor != NULL)\r
1263                     {\r
1264                         break;\r
1265                     }\r
1266                 }\r
1267 \r
1268                 // If status is anything other than accepted, then the head message\r
1269                 // was not propagated out.  Thus, nothing after it in the queue can\r
1270                 // be propagated out.  Cease propagation.\r
1271                 if (_Status != accepted)\r
1272                 {\r
1273                     break;\r
1274                 }\r
1275 \r
1276                 // Get the next message\r
1277                 _Msg = _M_messageBuffer.peek();\r
1278             }\r
1279         }\r
1280 \r
1281         /// <summary>\r
1282         ///     Message buffer used to store messages.\r
1283         /// </summary>\r
1284         MessageQueue<_Type> _M_messageBuffer;\r
1285 \r
1286         /// <summary>\r
1287         ///     Maximum number of messages bounded_buffer can hold.\r
1288         /// </summary>\r
1289         const size_t _M_capacity;\r
1290 \r
1291         /// <summary>\r
1292         ///     Current number of messages in bounded_buffer.\r
1293         /// </summary>\r
1294         volatile long _M_currentSize;\r
1295 \r
1296         /// <summary>\r
1297         ///     Lock used to guard saved message ids map.\r
1298         /// </summary>\r
1299         critical_section _M_savedIdsLock;\r
1300 \r
1301         /// <summary>\r
1302         ///     Map of source links to saved message ids.\r
1303         /// </summary>\r
1304         std::map<ISource<_Type> *, runtime_object_identity> _M_savedSourceMsgIds;\r
1305 \r
1306         //\r
1307         // Hide assignment operator and copy constructor\r
1308         //\r
1309         bounded_buffer const &operator =(bounded_buffer const&);  // no assignment operator\r
1310         bounded_buffer(bounded_buffer const &);                   // no copy constructor\r
1311     };\r
1312 \r
1313     /// <summary>\r
1314     ///        A simple alternator, offers messages in order to each target\r
1315     ///     one at a time. If a consume occurs a message won't be offered to that target again\r
1316     ///     until all others are given a chance. This causes messages to be distributed more\r
1317     ///     evenly among targets.\r
1318     /// </summary>\r
1319     /// <typeparam name="_Type">\r
1320     ///     The payload type of messages stored and propagated by the buffer.\r
1321     /// </typeparam>\r
1322     template<class _Type>\r
1323     class alternator : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>\r
1324     {\r
1325     public:\r
1326         /// <summary>\r
1327         ///     Create an alternator within the default scheduler, and places it any schedule\r
1328         ///     group of the scheduler\92s choosing.\r
1329         /// </summary>\r
1330         alternator()\r
1331             : _M_indexNextTarget(0)\r
1332         {\r
1333             initialize_source_and_target();\r
1334         }\r
1335 \r
1336         /// <summary>\r
1337         ///     Creates an alternator within the default scheduler, and places it any schedule\r
1338         ///     group of the scheduler\92s choosing.\r
1339         /// </summary>\r
1340         /// <param name="_Filter">\r
1341         ///     A reference to a filter function.\r
1342         /// </param>\r
1343         alternator(filter_method const& _Filter)\r
1344             : _M_indexNextTarget(0)\r
1345         {\r
1346             initialize_source_and_target();\r
1347             register_filter(_Filter);\r
1348         }\r
1349 \r
1350         /// <summary>\r
1351         ///     Creates an alternator within the specified scheduler, and places it any schedule\r
1352         ///     group of the scheduler\92s choosing.\r
1353         /// </summary>\r
1354         /// <param name="_PScheduler">\r
1355         ///     A reference to a scheduler instance.\r
1356         /// </param>\r
1357         alternator(Scheduler& _PScheduler)\r
1358             : _M_indexNextTarget(0)\r
1359         {\r
1360             initialize_source_and_target(&_PScheduler);\r
1361         }\r
1362 \r
1363         /// <summary>\r
1364         ///     Creates an alternator within the specified scheduler, and places it any schedule\r
1365         ///     group of the scheduler\92s choosing.\r
1366         /// </summary>\r
1367         /// <param name="_PScheduler">\r
1368         ///     A reference to a scheduler instance.\r
1369         /// </param>\r
1370         /// <param name="_Filter">\r
1371         ///     A reference to a filter function.\r
1372         /// </param>\r
1373         alternator(Scheduler& _PScheduler, filter_method const& _Filter) \r
1374             : _M_indexNextTarget(0)\r
1375         {\r
1376             initialize_source_and_target(&_PScheduler);\r
1377             register_filter(_Filter);\r
1378         }\r
1379 \r
1380         /// <summary>\r
1381         ///     Creates an alternator within the specified schedule group.  The scheduler is implied\r
1382         ///     by the schedule group.\r
1383         /// </summary>\r
1384         /// <param name="_PScheduleGroup">\r
1385         ///     A reference to a schedule group.\r
1386         /// </param>\r
1387         alternator(ScheduleGroup& _PScheduleGroup)\r
1388             : _M_indexNextTarget(0)\r
1389         {\r
1390             initialize_source_and_target(NULL, &_PScheduleGroup);\r
1391         }\r
1392 \r
1393         /// <summary>\r
1394         ///     Creates an alternator within the specified schedule group.  The scheduler is implied\r
1395         ///     by the schedule group.\r
1396         /// </summary>\r
1397         /// <param name="_PScheduleGroup">\r
1398         ///     A reference to a schedule group.\r
1399         /// </param>\r
1400         /// <param name="_Filter">\r
1401         ///     A reference to a filter function.\r
1402         /// </param>\r
1403         alternator(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter)\r
1404             : _M_indexNextTarget(0)\r
1405         {\r
1406             initialize_source_and_target(NULL, &_PScheduleGroup);\r
1407             register_filter(_Filter);\r
1408         }\r
1409 \r
1410         /// <summary>\r
1411         ///     Cleans up any resources that may have been created by the alternator.\r
1412         /// </summary>\r
1413         ~alternator()\r
1414         {\r
1415             // Remove all links\r
1416             remove_network_links();\r
1417         }\r
1418 \r
1419     protected:\r
1420 \r
1421         /// <summary>\r
1422         ///     The main propagate() function for ITarget blocks.  Called by a source\r
1423         ///     block, generally within an asynchronous task to send messages to its targets.\r
1424         /// </summary>\r
1425         /// <param name="_PMessage">\r
1426         ///     A pointer to the message\r
1427         /// </param>\r
1428         /// <param name="_PSource">\r
1429         ///     A pointer to the source block offering the message.\r
1430         /// </param>\r
1431         /// <returns>\r
1432         ///     An indication of what the target decided to do with the message.\r
1433         /// </returns>\r
1434         /// <remarks>\r
1435         ///     It is important that calls to propagate do *not* take the same lock on the\r
1436         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
1437         ///     result in a deadlock with the Consume call.  (in the case of the alternator,\r
1438         ///     this lock is the m_internalLock)\r
1439         /// </remarks>\r
1440         virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
1441         {\r
1442             message_status _Result = accepted;\r
1443             //\r
1444             // Accept the message being propagated\r
1445             // Note: depending on the source block propagating the message\r
1446             // this may not necessarily be the same message (pMessage) first\r
1447             // passed into the function.\r
1448             //\r
1449             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
1450 \r
1451             if (_PMessage != NULL)\r
1452             {\r
1453                 async_send(_PMessage);\r
1454             }\r
1455             else\r
1456             {\r
1457                 _Result = missed;\r
1458             }\r
1459 \r
1460             return _Result;\r
1461         }\r
1462 \r
1463         /// <summary>\r
1464         ///     Synchronously sends a message to this block.  When this function completes the message will\r
1465         ///     already have propagated into the block.\r
1466         /// </summary>\r
1467         /// <param name="_PMessage">\r
1468         ///     A pointer to the message.\r
1469         /// </param>\r
1470         /// <param name="_PSource">\r
1471         ///     A pointer to the source block offering the message.\r
1472         /// </param>\r
1473         /// <returns>\r
1474         ///     An indication of what the target decided to do with the message.\r
1475         /// </returns>\r
1476         virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)\r
1477         {\r
1478             _PMessage = _PSource->accept(_PMessage->msg_id(), this);\r
1479 \r
1480             if (_PMessage != NULL)\r
1481             {\r
1482                 sync_send(_PMessage);\r
1483             }\r
1484             else\r
1485             {\r
1486                 return missed;\r
1487             }\r
1488 \r
1489             return accepted;\r
1490         }\r
1491 \r
1492         /// <summary>\r
1493         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
1494         /// </summary>\r
1495         /// <param name="_MsgId">\r
1496         ///     The runtime object identity of the message.\r
1497         /// </param>\r
1498         /// <returns>\r
1499         ///     A pointer to the message that the caller now has ownership of.\r
1500         /// </returns>\r
1501         virtual message<_Type> * accept_message(runtime_object_identity _MsgId)\r
1502         {\r
1503             //\r
1504             // Peek at the head message in the message buffer.  If the Ids match\r
1505             // dequeue and transfer ownership\r
1506             //\r
1507             message<_Type> * _Msg = NULL;\r
1508 \r
1509             if (_M_messageBuffer.is_head(_MsgId))\r
1510             {\r
1511                 _Msg = _M_messageBuffer.dequeue();\r
1512             }\r
1513 \r
1514             return _Msg;\r
1515         }\r
1516 \r
1517         /// <summary>\r
1518         ///     Reserves a message previously offered by the source.\r
1519         /// </summary>\r
1520         /// <param name="_MsgId">\r
1521         ///     The runtime object identity of the message.\r
1522         /// </param>\r
1523         /// <returns>\r
1524         ///     A Boolean indicating whether the reservation worked or not.\r
1525         /// </returns>\r
1526         /// <remarks>\r
1527         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
1528         /// </remarks>\r
1529         virtual bool reserve_message(runtime_object_identity _MsgId)\r
1530         {\r
1531             // Allow reservation if this is the head message\r
1532             return _M_messageBuffer.is_head(_MsgId);\r
1533         }\r
1534 \r
1535         /// <summary>\r
1536         ///     Consumes a message that was reserved previously.\r
1537         /// </summary>\r
1538         /// <param name="_MsgId">\r
1539         ///     The runtime object identity of the message.\r
1540         /// </param>\r
1541         /// <returns>\r
1542         ///     A pointer to the message that the caller now has ownership of.\r
1543         /// </returns>\r
1544         /// <remarks>\r
1545         ///     Similar to 'accept', but is always preceded by a call to 'reserve'.\r
1546         /// </remarks>\r
1547         virtual message<_Type> * consume_message(runtime_object_identity _MsgId)\r
1548         {\r
1549             // Update so we don't offer to this target again until\r
1550             // all others have a chance.\r
1551             target_iterator _CurrentIter = _M_connectedTargets.begin();\r
1552             for(size_t i = 0;*_CurrentIter != NULL; ++_CurrentIter, ++i) \r
1553             {\r
1554                 if(*_CurrentIter == _M_pReservedFor)\r
1555                 {\r
1556                     _M_indexNextTarget = i + 1;\r
1557                     break;\r
1558                 }\r
1559             }\r
1560 \r
1561             // By default, accept the message\r
1562             return accept_message(_MsgId);\r
1563         }\r
1564 \r
1565         /// <summary>\r
1566         ///     Releases a previous message reservation.\r
1567         /// </summary>\r
1568         /// <param name="_MsgId">\r
1569         ///     The runtime object identity of the message.\r
1570         /// </param>\r
1571         virtual void release_message(runtime_object_identity _MsgId)\r
1572         {\r
1573             // The head message is the one reserved.\r
1574             if (!_M_messageBuffer.is_head(_MsgId))\r
1575             {\r
1576                 throw message_not_found();\r
1577             }\r
1578         }\r
1579 \r
1580         /// <summary>\r
1581         ///    Resumes propagation after a reservation has been released.\r
1582         /// </summary>\r
1583         virtual void resume_propagation()\r
1584         {\r
1585             // If there are any messages in the buffer, propagate them out\r
1586             if (_M_messageBuffer.count() > 0)\r
1587             {\r
1588                 async_send(NULL);\r
1589             }\r
1590         }\r
1591 \r
1592         /// <summary>\r
1593         ///     Notification that a target was linked to this source.\r
1594         /// </summary>\r
1595         /// <param name="_PTarget">\r
1596         ///     A pointer to the newly linked target.\r
1597         /// </param>\r
1598         virtual void link_target_notification(ITarget<_Type> * _PTarget)\r
1599         {\r
1600             // If the message queue is blocked due to reservation\r
1601             // there is no need to do any message propagation\r
1602             if (_M_pReservedFor != NULL)\r
1603             {\r
1604                 return;\r
1605             }\r
1606 \r
1607             message<_Type> * _Msg = _M_messageBuffer.peek();\r
1608 \r
1609             if (_Msg != NULL)\r
1610             {\r
1611                 // Propagate the head message to the new target\r
1612                 message_status _Status = _PTarget->propagate(_Msg, this);\r
1613 \r
1614                 if (_Status == accepted)\r
1615                 {\r
1616                     // The target accepted the message, restart propagation.\r
1617                     propagate_to_any_targets(NULL);\r
1618                 }\r
1619 \r
1620                 // If the status is anything other than accepted, then leave\r
1621                 // the message queue blocked.\r
1622             }\r
1623         }\r
1624 \r
1625         /// <summary>\r
1626         ///     Takes the message and propagates it to all the targets of this alternator.\r
1627         ///     This is called from async_send.\r
1628         /// </summary>\r
1629         /// <param name="_PMessage">\r
1630         ///     A pointer to a new message.\r
1631         /// </param>\r
1632         virtual void propagate_to_any_targets(message<_Type> * _PMessage)\r
1633         {\r
1634             // Enqueue pMessage to the internal buffer queue if it is non-NULL.\r
1635             // pMessage can be NULL if this LWT was the result of a Repropagate call\r
1636             // out of a Consume or Release (where no new message is queued up, but\r
1637             // everything remaining in the unbounded buffer needs to be propagated out)\r
1638             if (_PMessage != NULL)\r
1639             {\r
1640                 _M_messageBuffer.enqueue(_PMessage);\r
1641 \r
1642                 // If the incoming pMessage is not the head message, we can safely assume that\r
1643                 // the head message is blocked and waiting on Consume(), Release() or a new\r
1644                 // link_target()\r
1645                 if (!_M_messageBuffer.is_head(_PMessage->msg_id()))\r
1646                 {\r
1647                     return;\r
1648                 }\r
1649             }\r
1650 \r
1651             // Attempt to propagate messages to targets in order last left off.\r
1652             _Propagate_alternating_order();\r
1653         }\r
1654 \r
1655         /// <summary>\r
1656         ///     Offers messages to targets in alternating order to help distribute messages\r
1657         ///     evenly among targets.\r
1658         /// </summary>\r
1659         void _Propagate_alternating_order()\r
1660         {\r
1661             message<_Target_type> * _Msg = _M_messageBuffer.peek();\r
1662 \r
1663             // If someone has reserved the _Head message, don't propagate anymore\r
1664             if (_M_pReservedFor != NULL)\r
1665             {\r
1666                 return;\r
1667             }\r
1668 \r
1669             //\r
1670             // Try to start where left off before, if the link has been removed\r
1671             // or this is the first time then start at the beginning.\r
1672             //\r
1673             target_iterator _CurrentIter = _M_connectedTargets.begin();\r
1674             const target_iterator _FirstLinkIter(_CurrentIter);\r
1675             for(size_t i = 0;*_CurrentIter != NULL && i < _M_indexNextTarget; ++_CurrentIter, ++i) {}\r
1676 \r
1677             while (_Msg != NULL)\r
1678             {\r
1679                 message_status _Status = declined;\r
1680 \r
1681                 // Loop offering message until end of links is reached.\r
1682                 target_iterator _StartedIter(_CurrentIter);\r
1683                 for(;*_CurrentIter != NULL; ++_CurrentIter)\r
1684                 {\r
1685                     _Status = (*_CurrentIter)->propagate(_Msg, this);\r
1686                     ++_M_indexNextTarget;\r
1687 \r
1688                     // Ownership of message changed. Do not propagate this\r
1689                     // message to any other target.\r
1690                     if (_Status == accepted)\r
1691                     {\r
1692                         ++_CurrentIter;\r
1693                         break;\r
1694                     }\r
1695 \r
1696                     // If the target just propagated to reserved this message, stop\r
1697                     // propagating it to others\r
1698                     if (_M_pReservedFor != NULL)\r
1699                     {\r
1700                         return;\r
1701                     }\r
1702                 }\r
1703 \r
1704                 // Message ownership changed go to next messages.\r
1705                 if (_Status == accepted)\r
1706                 {\r
1707                     continue;\r
1708                 }\r
1709 \r
1710                 // Try starting from the beginning until the first link offering was started at.\r
1711                 _M_indexNextTarget = 0;\r
1712                 for(_CurrentIter = _FirstLinkIter;*_CurrentIter != NULL; ++_CurrentIter)\r
1713                 {\r
1714                     // I have offered the same message to all links now so stop.\r
1715                     if(*_CurrentIter == *_StartedIter)\r
1716                     {\r
1717                         break;\r
1718                     }\r
1719 \r
1720                     _Status = (*_CurrentIter)->propagate(_Msg, this);\r
1721                     ++_M_indexNextTarget;\r
1722 \r
1723                     // Ownership of message changed. Do not propagate this\r
1724                     // message to any other target.\r
1725                     if (_Status == accepted)\r
1726                     {\r
1727                         ++_CurrentIter;\r
1728                         break;\r
1729                     }\r
1730 \r
1731                     // If the target just propagated to reserved this message, stop\r
1732                     // propagating it to others\r
1733                     if (_M_pReservedFor != NULL)\r
1734                     {\r
1735                         return;\r
1736                     }\r
1737                 }\r
1738 \r
1739                 // If status is anything other than accepted, then the head message\r
1740                 // was not propagated out.  Thus, nothing after it in the queue can\r
1741                 // be propagated out.  Cease propagation.\r
1742                 if (_Status != accepted)\r
1743                 {\r
1744                     break;\r
1745                 }\r
1746 \r
1747                 // Get the next message\r
1748                 _Msg = _M_messageBuffer.peek();\r
1749             }\r
1750         }\r
1751 \r
1752     private:\r
1753 \r
1754         /// <summary>\r
1755         ///     Message queue used to store messages.\r
1756         /// </summary>\r
1757         MessageQueue<_Type> _M_messageBuffer;\r
1758 \r
1759         /// <summary>\r
1760         ///     Index of next target to call propagate on. Used to alternate and load\r
1761         ///     balance message offering.\r
1762         /// </summary>\r
1763         size_t _M_indexNextTarget;\r
1764 \r
1765         //\r
1766         // Hide assignment operator and copy constructor.\r
1767         //\r
1768         alternator const &operator =(alternator const&);  // no assignment operator\r
1769         alternator(alternator const &);                   // no copy constructor\r
1770     };\r
1771 \r
1772     #include <agents.h>\r
1773         \r
1774     //\r
1775     // Sample block that combines join and transform.\r
1776     //\r
1777     template<class _Input, class _Output, join_type _Jtype = non_greedy>\r
1778     class join_transform : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>\r
1779     {\r
1780     public:\r
1781 \r
1782         typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;\r
1783 \r
1784         /// <summary>\r
1785         ///     Create an join block within the default scheduler, and places it any schedule\r
1786         ///     group of the scheduler\92s choosing.\r
1787         /// </summary>\r
1788         /// <param name="_NumInputs">\r
1789         ///     The number of inputs this join will be allowed\r
1790         /// </param>\r
1791         join_transform(size_t _NumInputs, _Transform_method const& _Func)\r
1792             : _M_messageArray(_NumInputs, 0),\r
1793               _M_savedMessageIdArray(_NumInputs, -1),\r
1794               _M_pFunc(_Func)\r
1795         {\r
1796             _Initialize(_NumInputs);\r
1797         }\r
1798 \r
1799         /// <summary>\r
1800         ///     Create an join block within the default scheduler, and places it any schedule\r
1801         ///     group of the scheduler\92s choosing.\r
1802         /// </summary>\r
1803         /// <param name="_NumInputs">\r
1804         ///     The number of inputs this join will be allowed\r
1805         /// </param>\r
1806         /// <param name="_Filter">\r
1807         ///     A filter method placed on this join\r
1808         /// </param>\r
1809         join_transform(size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
1810             : _M_messageArray(_NumInputs, 0),\r
1811               _M_savedMessageIdArray(_NumInputs, -1),\r
1812               _M_pFunc(_Func)\r
1813         {\r
1814             _Initialize(_NumInputs);\r
1815             register_filter(_Filter);\r
1816         }\r
1817 \r
1818         /// <summary>\r
1819         ///     Create an join block within the specified scheduler, and places it any schedule\r
1820         ///     group of the scheduler\92s choosing.\r
1821         /// </summary>\r
1822         /// <param name="_Scheduler">\r
1823         ///     The scheduler onto which the task's message propagation will be scheduled.\r
1824         /// </param>\r
1825         /// <param name="_NumInputs">\r
1826         ///     The number of inputs this join will be allowed\r
1827         /// </param>\r
1828         join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func)\r
1829             : _M_messageArray(_NumInputs, 0),\r
1830               _M_savedMessageIdArray(_NumInputs, -1),\r
1831               _M_pFunc(_Func)\r
1832         {\r
1833             _Initialize(_NumInputs, &_PScheduler);\r
1834         }\r
1835 \r
1836         /// <summary>\r
1837         ///     Create an join block within the specified scheduler, and places it any schedule\r
1838         ///     group of the scheduler\92s choosing.\r
1839         /// </summary>\r
1840         /// <param name="_Scheduler">\r
1841         ///     The scheduler onto which the task's message propagation will be scheduled.\r
1842         /// </param>\r
1843         /// <param name="_NumInputs">\r
1844         ///     The number of inputs this join will be allowed\r
1845         /// </param>\r
1846         /// <param name="_Filter">\r
1847         ///     A filter method placed on this join\r
1848         /// </param>\r
1849         join_transform(Scheduler& _PScheduler, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
1850             : _M_messageArray(_NumInputs, 0),\r
1851               _M_savedMessageIdArray(_NumInputs, -1),\r
1852               _M_pFunc(_Func)\r
1853         {\r
1854             _Initialize(_NumInputs, &_PScheduler);\r
1855             register_filter(_Filter);\r
1856         }\r
1857 \r
1858         /// <summary>\r
1859         ///     Create an join block within the specified schedule group.  The scheduler is implied\r
1860         ///     by the schedule group.\r
1861         /// </summary>\r
1862         /// <param name="_PScheduleGroup">\r
1863         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
1864         /// </param>\r
1865         /// <param name="_NumInputs">\r
1866         ///     The number of inputs this join will be allowed\r
1867         /// </param>\r
1868         join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func)\r
1869             : _M_messageArray(_NumInputs, 0),\r
1870               _M_savedMessageIdArray(_NumInputs, -1),\r
1871               _M_pFunc(_Func)\r
1872         {\r
1873             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
1874         }\r
1875 \r
1876         /// <summary>\r
1877         ///     Create an join block within the specified schedule group.  The scheduler is implied\r
1878         ///     by the schedule group.\r
1879         /// </summary>\r
1880         /// <param name="_PScheduleGroup">\r
1881         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
1882         /// </param>\r
1883         /// <param name="_NumInputs">\r
1884         ///     The number of inputs this join will be allowed\r
1885         /// </param>\r
1886         /// <param name="_Filter">\r
1887         ///     A filter method placed on this join\r
1888         /// </param>\r
1889         join_transform(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, _Transform_method const& _Func, filter_method const& _Filter)\r
1890             : _M_messageArray(_NumInputs, 0),\r
1891               _M_savedMessageIdArray(_NumInputs, -1),\r
1892               _M_pFunc(_Func)\r
1893         {\r
1894             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
1895             register_filter(_Filter);\r
1896         }\r
1897 \r
1898         /// <summary>\r
1899         ///     Destroys a join\r
1900         /// </summary>\r
1901         ~join_transform()\r
1902         {\r
1903             // Remove all links that are targets of this join\r
1904             remove_network_links();\r
1905 \r
1906             delete [] _M_savedIdBuffer;\r
1907         }\r
1908 \r
1909     protected:\r
1910         //\r
1911         // propagator_block protected function implementations\r
1912         //\r
1913 \r
1914         /// <summary>\r
1915         ///     The main propagate() function for ITarget blocks.  Called by a source\r
1916         ///     block, generally within an asynchronous task to send messages to its targets.\r
1917         /// </summary>\r
1918         /// <param name="_PMessage">\r
1919         ///     The message being propagated\r
1920         /// </param>\r
1921         /// <param name="_PSource">\r
1922         ///     The source doing the propagation\r
1923         /// </param>\r
1924         /// <returns>\r
1925         ///     An indication of what the target decided to do with the message.\r
1926         /// </returns>\r
1927         /// <remarks>\r
1928         ///     It is important that calls to propagate do *not* take the same lock on the\r
1929         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
1930         ///     result in a deadlock with the Consume call. \r
1931         /// </remarks>\r
1932         message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) \r
1933         {\r
1934             message_status _Ret_val = accepted;\r
1935 \r
1936             //\r
1937             // Find the slot index of this source\r
1938             //\r
1939             size_t _Slot = 0;\r
1940             bool _Found = false;\r
1941             for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)\r
1942             {\r
1943                 if (*_Iter == _PSource)\r
1944                 {\r
1945                     _Found = true;\r
1946                     break;\r
1947                 }\r
1948 \r
1949                 _Slot++;\r
1950             }\r
1951 \r
1952             if (!_Found)\r
1953             {\r
1954                 // If this source was not found in the array, this is not a connected source\r
1955                 // decline the message\r
1956                 return declined;\r
1957             }\r
1958 \r
1959             _ASSERTE(_Slot < _M_messageArray.size());\r
1960 \r
1961             bool fIsGreedy = (_Jtype == greedy);\r
1962 \r
1963             if (fIsGreedy)\r
1964             {\r
1965                 //\r
1966                 // Greedy type joins immediately accept the message.\r
1967                 //\r
1968                 {\r
1969                     critical_section::scoped_lock lockHolder(_M_propagationLock);\r
1970                     if (_M_messageArray[_Slot] != NULL)\r
1971                     {\r
1972                         _M_savedMessageIdArray[_Slot] = _PMessage->msg_id();\r
1973                         _Ret_val = postponed;\r
1974                     }\r
1975                 }\r
1976 \r
1977                 if (_Ret_val != postponed)\r
1978                 {\r
1979                     _M_messageArray[_Slot] = _PSource->accept(_PMessage->msg_id(), this);\r
1980 \r
1981                     if (_M_messageArray[_Slot] != NULL)\r
1982                     {\r
1983                         if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
1984                         {\r
1985                             // If messages have arrived on all links, start a propagation\r
1986                             // of the current message\r
1987                             async_send(NULL);\r
1988                         }\r
1989                     }\r
1990                     else\r
1991                     {\r
1992                         _Ret_val = missed;\r
1993                     }\r
1994                 }\r
1995             }\r
1996             else\r
1997             {\r
1998                 //\r
1999                 // Non-greedy type joins save the message ids until they have all arrived\r
2000                 //\r
2001 \r
2002                 if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)\r
2003                 {\r
2004                     // Decrement the message remaining count if this thread is switching \r
2005                     // the saved id from -1 to a valid value.\r
2006                     if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
2007                     {\r
2008                         async_send(NULL);\r
2009                     }\r
2010                 }\r
2011 \r
2012                 // Always return postponed.  This message will be consumed\r
2013                 // in the LWT\r
2014                 _Ret_val = postponed;\r
2015             }\r
2016 \r
2017             return _Ret_val;\r
2018         }\r
2019 \r
2020         /// <summary>\r
2021         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
2022         /// </summary>\r
2023         /// <param name="_MsgId">\r
2024         ///     The runtime object identity of the message.\r
2025         /// </param>\r
2026         /// <returns>\r
2027         ///     A pointer to the message that the caller now has ownership of.\r
2028         /// </returns>\r
2029         virtual message<_Output> * accept_message(runtime_object_identity _MsgId)\r
2030         {\r
2031             //\r
2032             // Peek at the head message in the message buffer.  If the Ids match\r
2033             // dequeue and transfer ownership\r
2034             //\r
2035             message<_Output> * _Msg = NULL;\r
2036 \r
2037             if (_M_messageBuffer.is_head(_MsgId))\r
2038             {\r
2039                 _Msg = _M_messageBuffer.dequeue();\r
2040             }\r
2041 \r
2042             return _Msg;\r
2043         }\r
2044 \r
2045         /// <summary>\r
2046         ///     Reserves a message previously offered by the source.\r
2047         /// </summary>\r
2048         /// <param name="_MsgId">\r
2049         ///     The runtime object identity of the message.\r
2050         /// </param>\r
2051         /// <returns>\r
2052         ///     A bool indicating whether the reservation worked or not\r
2053         /// </returns>\r
2054         /// <remarks>\r
2055         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
2056         /// </remarks>\r
2057         virtual bool reserve_message(runtime_object_identity _MsgId)\r
2058         {\r
2059             // Allow reservation if this is the head message\r
2060             return _M_messageBuffer.is_head(_MsgId);\r
2061         }\r
2062 \r
2063         /// <summary>\r
2064         ///     Consumes a message previously offered by the source and reserved by the target, \r
2065         ///     transferring ownership to the caller.\r
2066         /// </summary>\r
2067         /// <param name="_MsgId">\r
2068         ///     The runtime object identity of the message.\r
2069         /// </param>\r
2070         /// <returns>\r
2071         ///     A pointer to the message that the caller now has ownership of.\r
2072         /// </returns>\r
2073         /// <remarks>\r
2074         ///     Similar to 'accept', but is always preceded by a call to 'reserve'\r
2075         /// </remarks>\r
2076         virtual message<_Output> * consume_message(runtime_object_identity _MsgId)\r
2077         {\r
2078             // By default, accept the message\r
2079             return accept_message(_MsgId);\r
2080         }\r
2081 \r
2082         /// <summary>\r
2083         ///     Releases a previous message reservation.\r
2084         /// </summary>\r
2085         /// <param name="_MsgId">\r
2086         ///     The runtime object identity of the message.\r
2087         /// </param>\r
2088         virtual void release_message(runtime_object_identity _MsgId)\r
2089         {\r
2090             // The head message is the one reserved.\r
2091             if (!_M_messageBuffer.is_head(_MsgId))\r
2092             {\r
2093                 throw message_not_found();\r
2094             }\r
2095         }\r
2096 \r
2097         /// <summary>\r
2098         ///     Resumes propagation after a reservation has been released\r
2099         /// </summary>\r
2100         virtual void resume_propagation()\r
2101         {\r
2102             // If there are any messages in the buffer, propagate them out\r
2103             if (_M_messageBuffer.count() > 0)\r
2104             {\r
2105                 async_send(NULL);\r
2106             }\r
2107         }\r
2108 \r
2109         /// <summary>\r
2110         ///     Notification that a target was linked to this source.\r
2111         /// </summary>\r
2112         /// <param name="_PTarget">\r
2113         ///     A pointer to the newly linked target.\r
2114         /// </param>\r
2115         virtual void link_target_notification(ITarget<_Output> *)\r
2116         {\r
2117             // If the message queue is blocked due to reservation\r
2118             // there is no need to do any message propagation\r
2119             if (_M_pReservedFor != NULL)\r
2120             {\r
2121                 return;\r
2122             }\r
2123 \r
2124             _Propagate_priority_order(_M_messageBuffer);\r
2125         }\r
2126 \r
2127         /// <summary>\r
2128         ///     Takes the message and propagates it to all the target of this join.\r
2129         ///     This is called from async_send.\r
2130         /// </summary>\r
2131         /// <param name="_PMessage">\r
2132         ///     The message being propagated\r
2133         /// </param>\r
2134         void propagate_to_any_targets(message<_Output> *) \r
2135         {\r
2136             message<_Output> * _Msg = NULL;\r
2137             // Create a new message from the input sources\r
2138             // If messagesRemaining == 0, we have a new message to create.  Otherwise, this is coming from\r
2139             // a consume or release from the target.  In that case we don't want to create a new message.\r
2140             if (_M_messagesRemaining == 0)\r
2141             {\r
2142                 // A greedy join can immediately create the message, a non-greedy\r
2143                 // join must try and consume all the messages it has postponed\r
2144                 _Msg = _Create_new_message();\r
2145             }\r
2146 \r
2147             if (_Msg == NULL)\r
2148             {\r
2149                 // Create message failed.  This happens in non_greedy joins when the\r
2150                 // reserve/consumption of a postponed message failed.\r
2151                 _Propagate_priority_order(_M_messageBuffer);\r
2152                 return;\r
2153             }\r
2154 \r
2155             bool fIsGreedy = (_Jtype == greedy);\r
2156 \r
2157             // For a greedy join, reset the number of messages remaining\r
2158             // Check to see if multiple messages have been passed in on any of the links,\r
2159             // and postponed. If so, try and reserve/consume them now\r
2160             if (fIsGreedy)\r
2161             {\r
2162                 // Look at the saved ids and reserve/consume any that have passed in while\r
2163                 // this join was waiting to complete\r
2164                 _ASSERTE(_M_messageArray.size() == _M_savedMessageIdArray.size());\r
2165 \r
2166                 for (size_t i = 0; i < _M_messageArray.size(); i++)\r
2167                 {\r
2168                     for(;;)\r
2169                     {\r
2170                         runtime_object_identity _Saved_id;\r
2171                         // Grab the current saved id value.  This value could be changing from based on any\r
2172                         // calls of source->propagate(this).  If the message id is different than what is snapped\r
2173                         // here, that means, the reserve below must fail.  This is because reserve is trying\r
2174                         // to get the same source lock the propagate(this) call must be holding.\r
2175                         {\r
2176                             critical_section::scoped_lock lockHolder(_M_propagationLock);\r
2177 \r
2178                             _ASSERTE(_M_messageArray[i] != NULL);\r
2179 \r
2180                             _Saved_id = _M_savedMessageIdArray[i];\r
2181 \r
2182                             if (_Saved_id == -1)\r
2183                             {\r
2184                                 _M_messageArray[i] = NULL;\r
2185                                 break;\r
2186                             }\r
2187                             else\r
2188                             {\r
2189                                 _M_savedMessageIdArray[i] = -1;\r
2190                             }\r
2191                         }\r
2192 \r
2193                         if (_Saved_id != -1)\r
2194                         {\r
2195                             source_iterator _Iter = _M_connectedSources.begin();\r
2196                             \r
2197                             ISource<_Input> * _PSource = _Iter[i];\r
2198                             if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))\r
2199                             {\r
2200                                 _M_messageArray[i] = _PSource->consume(_Saved_id, this);\r
2201                                 _InterlockedDecrementSizeT(&_M_messagesRemaining);\r
2202                                 break;\r
2203                             }\r
2204                         }\r
2205                     }\r
2206                 }\r
2207 \r
2208                 // If messages have all been received, async_send again, this will start the\r
2209                 // LWT up to create a new message\r
2210                 if (_M_messagesRemaining == 0)\r
2211                 {\r
2212                     async_send(NULL);\r
2213                 }\r
2214             }\r
2215             \r
2216             // Add the new message to the outbound queue\r
2217             _M_messageBuffer.enqueue(_Msg);\r
2218 \r
2219             if (!_M_messageBuffer.is_head(_Msg->msg_id()))\r
2220             {\r
2221                 // another message is at the head of the outbound message queue and blocked\r
2222                 // simply return\r
2223                 return;\r
2224             }\r
2225 \r
2226             _Propagate_priority_order(_M_messageBuffer);\r
2227         }\r
2228 \r
2229     private:\r
2230 \r
2231         //\r
2232         //  Private Methods\r
2233         //\r
2234 \r
2235         /// <summary>\r
2236         ///     Propagate messages in priority order\r
2237         /// </summary>\r
2238         /// <param name="_MessageBuffer">\r
2239         ///     Reference to a message queue with messages to be propagated\r
2240         /// </param>\r
2241         void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)\r
2242         {\r
2243             message<_Output> * _Msg = _MessageBuffer.peek();\r
2244 \r
2245             // If someone has reserved the _Head message, don't propagate anymore\r
2246             if (_M_pReservedFor != NULL)\r
2247             {\r
2248                 return;\r
2249             }\r
2250 \r
2251             while (_Msg != NULL)\r
2252             {\r
2253                 message_status _Status = declined;\r
2254 \r
2255                 // Always start from the first target that linked\r
2256                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
2257                 {\r
2258                     ITarget<_Output> * _PTarget = *_Iter;\r
2259                     _Status = _PTarget->propagate(_Msg, this);\r
2260 \r
2261                     // Ownership of message changed. Do not propagate this\r
2262                     // message to any other target.\r
2263                     if (_Status == accepted)\r
2264                     {\r
2265                         break;\r
2266                     }\r
2267 \r
2268                     // If the target just propagated to reserved this message, stop\r
2269                     // propagating it to others\r
2270                     if (_M_pReservedFor != NULL)\r
2271                     {\r
2272                         break;\r
2273                     }\r
2274                 }\r
2275 \r
2276                 // If status is anything other than accepted, then the head message\r
2277                 // was not propagated out.  Thus, nothing after it in the queue can\r
2278                 // be propagated out.  Cease propagation.\r
2279                 if (_Status != accepted)\r
2280                 {\r
2281                     break;\r
2282                 }\r
2283 \r
2284                 // Get the next message\r
2285                 _Msg = _MessageBuffer.peek();\r
2286             }\r
2287         }\r
2288 \r
2289         /// <summary>\r
2290         ///     Create a new message from the data output\r
2291         /// </summary>\r
2292         /// <returns>\r
2293         ///     The created message (NULL if creation failed)\r
2294         /// </returns>\r
2295         message<_Output> * __cdecl _Create_new_message()\r
2296         {\r
2297             bool fIsNonGreedy = (_Jtype == non_greedy);\r
2298 \r
2299             // If this is a non-greedy join, check each source and try to consume their message\r
2300             if (fIsNonGreedy)\r
2301             {\r
2302 \r
2303                 // The iterator _Iter below will ensure that it is safe to touch\r
2304                 // non-NULL source pointers. Take a snapshot.\r
2305                 std::vector<ISource<_Input> *> _Sources;\r
2306                 source_iterator _Iter = _M_connectedSources.begin();\r
2307 \r
2308                 while (*_Iter != NULL)\r
2309                 {\r
2310                     ISource<_Input> * _PSource = *_Iter;\r
2311 \r
2312                     if (_PSource == NULL)\r
2313                     {\r
2314                         break;\r
2315                     }\r
2316 \r
2317                     _Sources.push_back(_PSource);\r
2318                     ++_Iter;\r
2319                 }\r
2320 \r
2321                 if (_Sources.size() != _M_messageArray.size())\r
2322                 {\r
2323                     // Some of the sources were unlinked. The join is broken\r
2324                     return NULL;\r
2325                 }\r
2326 \r
2327                 // First, try and reserve all the messages.  If a reservation fails,\r
2328                 // then release any reservations that had been made.\r
2329                 for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)\r
2330                 {\r
2331                     // Snap the current saved id into a buffer.  This value can be changing behind the scenes from\r
2332                     // other source->propagate(msg, this) calls, but if so, that just means the reserve below will\r
2333                     // fail.\r
2334                     _InterlockedIncrementSizeT(&_M_messagesRemaining);\r
2335                     _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);\r
2336 \r
2337                     _ASSERTE(_M_savedIdBuffer[i] != -1);\r
2338 \r
2339                     if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))\r
2340                     {\r
2341                         // A reservation failed, release all reservations made up until\r
2342                         // this block, and wait for another message to arrive on this link\r
2343                         for (size_t j = 0; j < i; j++)\r
2344                         {\r
2345                             _Sources[j]->release(_M_savedIdBuffer[j], this);\r
2346                             if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1) == -1)\r
2347                             {\r
2348                                 if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0)\r
2349                                 {\r
2350                                     async_send(NULL);\r
2351                                 }\r
2352                             }\r
2353                         }\r
2354 \r
2355                         // Return NULL to indicate that the create failed\r
2356                         return NULL;\r
2357                     }  \r
2358                 }\r
2359 \r
2360                 // Since everything has been reserved, consume all the messages.\r
2361                 // This is guaranteed to return true.\r
2362                 for (size_t i = 0; i < _M_messageArray.size(); i++)\r
2363                 {\r
2364                     _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);\r
2365                     _M_savedIdBuffer[i] = -1;\r
2366                 }\r
2367             }\r
2368 \r
2369             if (!fIsNonGreedy)\r
2370             {\r
2371                 // Reinitialize how many messages are being waited for.\r
2372                 // This is safe because all messages have been received, thus no new async_sends for\r
2373                 // greedy joins can be called.\r
2374                 _M_messagesRemaining = _M_messageArray.size();\r
2375             }\r
2376 \r
2377             std::vector<_Input> _OutputVector;\r
2378             for (size_t i = 0; i < _M_messageArray.size(); i++)\r
2379             {\r
2380                 _ASSERTE(_M_messageArray[i] != NULL);\r
2381                 _OutputVector.push_back(_M_messageArray[i]->payload);\r
2382 \r
2383                 delete _M_messageArray[i];\r
2384                 if (fIsNonGreedy)\r
2385                 {\r
2386                     _M_messageArray[i] = NULL;\r
2387                 }\r
2388             }\r
2389 \r
2390             _Output _Out = _M_pFunc(_OutputVector);\r
2391 \r
2392             return (new message<_Output>(_Out));\r
2393         }\r
2394 \r
2395         /// <summary>\r
2396         ///     Initialize the join block\r
2397         /// </summary>\r
2398         /// <param name="_NumInputs">\r
2399         ///     The number of inputs\r
2400         /// </param>\r
2401         void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)\r
2402         {\r
2403             initialize_source_and_target(_PScheduler, _PScheduleGroup);\r
2404 \r
2405             _M_connectedSources.set_bound(_NumInputs);\r
2406             _M_messagesRemaining = _NumInputs;\r
2407 \r
2408             bool fIsNonGreedy = (_Jtype == non_greedy);\r
2409 \r
2410             if (fIsNonGreedy)\r
2411             {\r
2412                 // Non greedy joins need a buffer to snap off saved message ids to.\r
2413                 _M_savedIdBuffer = new runtime_object_identity[_NumInputs];\r
2414                 memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);\r
2415             }\r
2416             else\r
2417             {\r
2418                 _M_savedIdBuffer = NULL;\r
2419             }\r
2420         }\r
2421 \r
2422         // The current number of messages remaining\r
2423         volatile size_t _M_messagesRemaining;\r
2424 \r
2425         // An array containing the accepted messages of this join.\r
2426         std::vector<message<_Input>*> _M_messageArray;\r
2427 \r
2428         // An array containing the msg ids of messages propagated to the array\r
2429         // For greedy joins, this contains a log of other messages passed to this\r
2430         // join after the first has been accepted\r
2431         // For non-greedy joins, this contains the message id of any message \r
2432         // passed to it.\r
2433         std::vector<runtime_object_identity> _M_savedMessageIdArray;\r
2434 \r
2435         // The transformer method called by this block\r
2436         _Transform_method _M_pFunc;\r
2437 \r
2438         // Buffer for snapping saved ids in non-greedy joins\r
2439         runtime_object_identity * _M_savedIdBuffer;\r
2440 \r
2441         // A lock for modifying the buffer or the connected blocks\r
2442         ::Concurrency::critical_section _M_propagationLock;\r
2443 \r
2444         // Queue to hold output messages\r
2445         MessageQueue<_Output> _M_messageBuffer;\r
2446     };\r
2447 \r
2448     //\r
2449     // Message block that invokes a transform method when it receives message on any of the input links.\r
2450     // A typical example is recal engine for a cell in an Excel spreadsheet.\r
2451     // (Remember that a normal join block is triggered only when it receives messages on all its input links).\r
2452     //\r
2453     template<class _Input, class _Output>\r
2454     class recalculate : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>\r
2455     {\r
2456     public:\r
2457         typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Recalculate_method;\r
2458 \r
2459         /// <summary>\r
2460         ///     Create an recalculate block within the default scheduler, and places it any schedule\r
2461         ///     group of the scheduler\92s choosing.\r
2462         /// </summary>\r
2463         /// <param name="_NumInputs">\r
2464         ///     The number of inputs \r
2465         /// </param>\r
2466         recalculate(size_t _NumInputs, _Recalculate_method const& _Func)\r
2467             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2468               _M_pFunc(_Func)\r
2469         {\r
2470             _Initialize(_NumInputs);\r
2471         }\r
2472 \r
2473         /// <summary>\r
2474         ///     Create an recalculate block within the default scheduler, and places it any schedule\r
2475         ///     group of the scheduler\92s choosing.\r
2476         /// </summary>\r
2477         /// <param name="_NumInputs">\r
2478         ///     The number of inputs \r
2479         /// </param>\r
2480         /// <param name="_Filter">\r
2481         ///     A filter method placed on this join\r
2482         /// </param>\r
2483         recalculate(size_t _NumInputs, _Recalculate_method const& _Func, filter_method const& _Filter)\r
2484             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2485               _M_pFunc(_Func)\r
2486         {\r
2487             _Initialize(_NumInputs);\r
2488             register_filter(_Filter);\r
2489         }\r
2490 \r
2491         /// <summary>\r
2492         ///     Create an recalculate block within the specified scheduler, and places it any schedule\r
2493         ///     group of the scheduler\92s choosing.\r
2494         /// </summary>\r
2495         /// <param name="_Scheduler">\r
2496         ///     The scheduler onto which the task's message propagation will be scheduled.\r
2497         /// </param>\r
2498         /// <param name="_NumInputs">\r
2499         ///     The number of inputs \r
2500         /// </param>\r
2501         recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func)\r
2502             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2503               _M_pFunc(_Func)\r
2504         {\r
2505             _Initialize(_NumInputs, &_PScheduler);\r
2506         }\r
2507 \r
2508         /// <summary>\r
2509         ///     Create an recalculate block within the specified scheduler, and places it any schedule\r
2510         ///     group of the scheduler\92s choosing.\r
2511         /// </summary>\r
2512         /// <param name="_Scheduler">\r
2513         ///     The scheduler onto which the task's message propagation will be scheduled.\r
2514         /// </param>\r
2515         /// <param name="_NumInputs">\r
2516         ///     The number of inputs \r
2517         /// </param>\r
2518         /// <param name="_Filter">\r
2519         ///     A filter method placed on this join\r
2520         /// </param>\r
2521         recalculate(size_t _NumInputs, Scheduler& _PScheduler, _Recalculate_method const& _Func, filter_method const& _Filter)\r
2522             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2523               _M_pFunc(_Func)\r
2524         {\r
2525             _Initialize(_NumInputs, &_PScheduler);\r
2526             register_filter(_Filter);\r
2527         }\r
2528 \r
2529         /// <summary>\r
2530         ///     Create an recalculate block within the specified schedule group.  The scheduler is implied\r
2531         ///     by the schedule group.\r
2532         /// </summary>\r
2533         /// <param name="_PScheduleGroup">\r
2534         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
2535         /// </param>\r
2536         /// <param name="_NumInputs">\r
2537         ///     The number of inputs \r
2538         /// </param>\r
2539         recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func)\r
2540             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2541               _M_pFunc(_Func)\r
2542         {\r
2543             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
2544         }\r
2545 \r
2546         /// <summary>\r
2547         ///     Create an recalculate block within the specified schedule group.  The scheduler is implied\r
2548         ///     by the schedule group.\r
2549         /// </summary>\r
2550         /// <param name="_PScheduleGroup">\r
2551         ///     The ScheduleGroup onto which the task's message propagation will be scheduled.\r
2552         /// </param>\r
2553         /// <param name="_NumInputs">\r
2554         ///     The number of inputs \r
2555         /// </param>\r
2556         /// <param name="_Filter">\r
2557         ///     A filter method placed on this join\r
2558         /// </param>\r
2559         recalculate(size_t _NumInputs, ScheduleGroup& _PScheduleGroup, _Recalculate_method const& _Func, filter_method const& _Filter)\r
2560             : _M_messageArray(_NumInputs, 0), _M_savedMessageIdArray(_NumInputs, -1),\r
2561               _M_pFunc(_Func)\r
2562         {\r
2563             _Initialize(_NumInputs, NULL, &_PScheduleGroup);\r
2564             register_filter(_Filter);\r
2565         }\r
2566 \r
2567         /// <summary>\r
2568         ///     Destroys a join\r
2569         /// </summary>\r
2570         ~recalculate()\r
2571         {\r
2572             // Remove all links that are targets of this join\r
2573             remove_network_links();\r
2574 \r
2575             delete [] _M_savedIdBuffer;\r
2576         }\r
2577 \r
2578     protected:\r
2579         //\r
2580         // propagator_block protected function implementations\r
2581         //\r
2582 \r
2583         /// <summary>\r
2584         ///     The main propagate() function for ITarget blocks.  Called by a source\r
2585         ///     block, generally within an asynchronous task to send messages to its targets.\r
2586         /// </summary>\r
2587         /// <param name="_PMessage">\r
2588         ///     The message being propagated\r
2589         /// </param>\r
2590         /// <param name="_PSource">\r
2591         ///     The source doing the propagation\r
2592         /// </param>\r
2593         /// <returns>\r
2594         ///     An indication of what the target decided to do with the message.\r
2595         /// </returns>\r
2596         /// <remarks>\r
2597         ///     It is important that calls to propagate do *not* take the same lock on the\r
2598         ///     internal structure that is used by Consume and the LWT.  Doing so could\r
2599         ///     result in a deadlock with the Consume call. \r
2600         /// </remarks>\r
2601         message_status propagate_message(message<_Input> * _PMessage, ISource<_Input> * _PSource) \r
2602         {\r
2603             //\r
2604             // Find the slot index of this source\r
2605             //\r
2606             size_t _Slot = 0;\r
2607             bool _Found = false;\r
2608             for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)\r
2609             {\r
2610                 if (*_Iter == _PSource)\r
2611                 {\r
2612                     _Found = true;\r
2613                     break;\r
2614                 }\r
2615 \r
2616                 _Slot++;\r
2617             }\r
2618 \r
2619             if (!_Found)\r
2620             {\r
2621                 // If this source was not found in the array, this is not a connected source\r
2622                 // decline the message\r
2623                 return declined;\r
2624             }\r
2625 \r
2626             //\r
2627             // Save the message id\r
2628             //\r
2629             if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray[_Slot], _PMessage->msg_id()) == -1)\r
2630             {\r
2631                 // If it is not seen by Create_message attempt a recalculate\r
2632                 async_send(NULL);\r
2633             }\r
2634 \r
2635             // Always return postponed.  This message will be consumed\r
2636             // in the LWT\r
2637             return postponed;\r
2638         }\r
2639 \r
2640         /// <summary>\r
2641         ///     Accepts an offered message by the source, transferring ownership to the caller.\r
2642         /// </summary>\r
2643         /// <param name="_MsgId">\r
2644         ///     The runtime object identity of the message.\r
2645         /// </param>\r
2646         /// <returns>\r
2647         ///     A pointer to the message that the caller now has ownership of.\r
2648         /// </returns>\r
2649         virtual message<_Output> * accept_message(runtime_object_identity _MsgId)\r
2650         {\r
2651             //\r
2652             // Peek at the head message in the message buffer.  If the Ids match\r
2653             // dequeue and transfer ownership\r
2654             //\r
2655             message<_Output> * _Msg = NULL;\r
2656 \r
2657             if (_M_messageBuffer.is_head(_MsgId))\r
2658             {\r
2659                 _Msg = _M_messageBuffer.dequeue();\r
2660             }\r
2661 \r
2662             return _Msg;\r
2663         }\r
2664 \r
2665         /// <summary>\r
2666         ///     Reserves a message previously offered by the source.\r
2667         /// </summary>\r
2668         /// <param name="_MsgId">\r
2669         ///     The runtime object identity of the message.\r
2670         /// </param>\r
2671         /// <returns>\r
2672         ///     A bool indicating whether the reservation worked or not\r
2673         /// </returns>\r
2674         /// <remarks>\r
2675         ///     After 'reserve' is called, either 'consume' or 'release' must be called.\r
2676         /// </remarks>\r
2677         virtual bool reserve_message(runtime_object_identity _MsgId)\r
2678         {\r
2679             // Allow reservation if this is the head message\r
2680             return _M_messageBuffer.is_head(_MsgId);\r
2681         }\r
2682 \r
2683         /// <summary>\r
2684         ///     Consumes a message previously offered by the source and reserved by the target, \r
2685         ///     transferring ownership to the caller.\r
2686         /// </summary>\r
2687         /// <param name="_MsgId">\r
2688         ///     The runtime object identity of the message.\r
2689         /// </param>\r
2690         /// <returns>\r
2691         ///     A pointer to the message that the caller now has ownership of.\r
2692         /// </returns>\r
2693         /// <remarks>\r
2694         ///     Similar to 'accept', but is always preceded by a call to 'reserve'\r
2695         /// </remarks>\r
2696         virtual message<_Output> * consume_message(runtime_object_identity _MsgId)\r
2697         {\r
2698             // By default, accept the message\r
2699             return accept_message(_MsgId);\r
2700         }\r
2701 \r
2702         /// <summary>\r
2703         ///     Releases a previous message reservation.\r
2704         /// </summary>\r
2705         /// <param name="_MsgId">\r
2706         ///     The runtime object identity of the message.\r
2707         /// </param>\r
2708         virtual void release_message(runtime_object_identity _MsgId)\r
2709         {\r
2710             // The head message is the one reserved.\r
2711             if (!_M_messageBuffer.is_head(_MsgId))\r
2712             {\r
2713                 throw message_not_found();\r
2714             }\r
2715         }\r
2716 \r
2717         /// <summary>\r
2718         ///     Resumes propagation after a reservation has been released\r
2719         /// </summary>\r
2720         virtual void resume_propagation()\r
2721         {\r
2722             // If there are any messages in the buffer, propagate them out\r
2723             if (_M_messageBuffer.count() > 0)\r
2724             {\r
2725                 async_send(NULL);\r
2726             }\r
2727         }\r
2728 \r
2729         /// <summary>\r
2730         ///     Notification that a target was linked to this source.\r
2731         /// </summary>\r
2732         /// <param name="_PTarget">\r
2733         ///     A pointer to the newly linked target.\r
2734         /// </param>\r
2735         virtual void link_target_notification(ITarget<_Output> *)\r
2736         {\r
2737             // If the message queue is blocked due to reservation\r
2738             // there is no need to do any message propagation\r
2739             if (_M_pReservedFor != NULL)\r
2740             {\r
2741                 return;\r
2742             }\r
2743 \r
2744             _Propagate_priority_order(_M_messageBuffer);\r
2745         }\r
2746 \r
2747         /// <summary>\r
2748         ///     Takes the message and propagates it to all the target of this join.\r
2749         ///     This is called from async_send.\r
2750         /// </summary>\r
2751         /// <param name="_PMessage">\r
2752         ///     The message being propagated\r
2753         /// </param>\r
2754         void propagate_to_any_targets(message<_Output> *) \r
2755         {\r
2756             // Attempt to create a new message\r
2757             message<_Output> * _Msg = _Create_new_message();\r
2758 \r
2759             if (_Msg != NULL)\r
2760             {\r
2761                 // Add the new message to the outbound queue\r
2762                 _M_messageBuffer.enqueue(_Msg);\r
2763 \r
2764                 if (!_M_messageBuffer.is_head(_Msg->msg_id()))\r
2765                 {\r
2766                     // another message is at the head of the outbound message queue and blocked\r
2767                     // simply return\r
2768                     return;\r
2769                 }\r
2770             }\r
2771 \r
2772             _Propagate_priority_order(_M_messageBuffer);\r
2773         }\r
2774 \r
2775     private:\r
2776 \r
2777         //\r
2778         //  Private Methods\r
2779         //\r
2780 \r
2781         /// <summary>\r
2782         ///     Propagate messages in priority order\r
2783         /// </summary>\r
2784         /// <param name="_MessageBuffer">\r
2785         ///     Reference to a message queue with messages to be propagated\r
2786         /// </param>\r
2787         void _Propagate_priority_order(MessageQueue<_Output> & _MessageBuffer)\r
2788         {\r
2789             message<_Output> * _Msg = _MessageBuffer.peek();\r
2790 \r
2791             // If someone has reserved the _Head message, don't propagate anymore\r
2792             if (_M_pReservedFor != NULL)\r
2793             {\r
2794                 return;\r
2795             }\r
2796 \r
2797             while (_Msg != NULL)\r
2798             {\r
2799                 message_status _Status = declined;\r
2800 \r
2801                 // Always start from the first target that linked\r
2802                 for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)\r
2803                 {\r
2804                     ITarget<_Output> * _PTarget = *_Iter;\r
2805                     _Status = _PTarget->propagate(_Msg, this);\r
2806 \r
2807                     // Ownership of message changed. Do not propagate this\r
2808                     // message to any other target.\r
2809                     if (_Status == accepted)\r
2810                     {\r
2811                         break;\r
2812                     }\r
2813 \r
2814                     // If the target just propagated to reserved this message, stop\r
2815                     // propagating it to others\r
2816                     if (_M_pReservedFor != NULL)\r
2817                     {\r
2818                         break;\r
2819                     }\r
2820                 }\r
2821 \r
2822                 // If status is anything other than accepted, then the head message\r
2823                 // was not propagated out.  Thus, nothing after it in the queue can\r
2824                 // be propagated out.  Cease propagation.\r
2825                 if (_Status != accepted)\r
2826                 {\r
2827                     break;\r
2828                 }\r
2829 \r
2830                 // Get the next message\r
2831                 _Msg = _MessageBuffer.peek();\r
2832             }\r
2833         }\r
2834 \r
2835         /// <summary>\r
2836         ///     Create a new message from the data output\r
2837         /// </summary>\r
2838         /// <returns>\r
2839         ///     The created message (NULL if creation failed)\r
2840         /// </returns>\r
2841         message<_Output> * __cdecl _Create_new_message()\r
2842         {\r
2843             // If this is a non-greedy join, check each source and try to consume their message\r
2844             size_t _NumInputs = _M_savedMessageIdArray.size();\r
2845 \r
2846             // The iterator _Iter below will ensure that it is safe to touch\r
2847             // non-NULL source pointers. Take a snapshot.\r
2848             std::vector<ISource<_Input> *> _Sources;\r
2849             source_iterator _Iter = _M_connectedSources.begin();\r
2850 \r
2851             while (*_Iter != NULL)\r
2852             {\r
2853                 ISource<_Input> * _PSource = *_Iter;\r
2854 \r
2855                 if (_PSource == NULL)\r
2856                 {\r
2857                     break;\r
2858                 }\r
2859 \r
2860                 _Sources.push_back(_PSource);\r
2861                 ++_Iter;\r
2862             }\r
2863 \r
2864             if (_Sources.size() != _NumInputs)\r
2865             {\r
2866                 // Some of the sources were unlinked. The join is broken\r
2867                 return NULL;\r
2868             }\r
2869 \r
2870             // First, try and reserve all the messages.  If a reservation fails,\r
2871             // then release any reservations that had been made.\r
2872             for (size_t i = 0; i < _M_savedMessageIdArray.size(); i++)\r
2873             {\r
2874                 // Swap the id to -1 indicating that we have used that value for a recalculate\r
2875                 _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray[i], -1);\r
2876 \r
2877                 // If the id is -1, either we have never received a message on that link or the previous message is stored\r
2878                 // in the message array. If it is the former we abort. \r
2879                 // If the id is not -1, we attempt to reserve the message. On failure we abort.\r
2880                 if (((_M_savedIdBuffer[i] == -1) && (_M_messageArray[i] == NULL))\r
2881                     || ((_M_savedIdBuffer[i] != -1) && !_Sources[i]->reserve(_M_savedIdBuffer[i], this)))\r
2882                 {\r
2883                     // Abort. Release all reservations made up until this block, \r
2884                     // and wait for another message to arrive.\r
2885                     for (size_t j = 0; j < i; j++)\r
2886                     {\r
2887                         if (_M_savedIdBuffer[j] != -1)\r
2888                         {\r
2889                             _Sources[j]->release(_M_savedIdBuffer[j], this);\r
2890                             _InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray[j], _M_savedIdBuffer[j], -1);\r
2891                         }\r
2892                     }\r
2893 \r
2894                     // Return NULL to indicate that the create failed\r
2895                     return NULL;\r
2896                 }  \r
2897             }\r
2898 \r
2899             // Since everything has been reserved, consume all the messages.\r
2900             // This is guaranteed to return true.\r
2901             size_t _NewMessages = 0;\r
2902             for (size_t i = 0; i < _NumInputs; i++)\r
2903             {\r
2904                 if (_M_savedIdBuffer[i] != -1)\r
2905                 {\r
2906                     // Delete previous message since we have a new one\r
2907                     if (_M_messageArray[i] != NULL)\r
2908                     {\r
2909                         delete _M_messageArray[i];\r
2910                     }\r
2911                     _M_messageArray[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);\r
2912                     _M_savedIdBuffer[i] = -1;\r
2913                     _NewMessages++;\r
2914                 }\r
2915             }\r
2916 \r
2917             if (_NewMessages == 0)\r
2918             {\r
2919                 // There is no need to recal if we did not consume a new message\r
2920                 return NULL;\r
2921             }\r
2922 \r
2923             std::vector<_Input> _OutputVector;\r
2924             for (size_t i = 0; i < _NumInputs; i++)\r
2925             {\r
2926                 _ASSERTE(_M_messageArray[i] != NULL);\r
2927                 _OutputVector.push_back(_M_messageArray[i]->payload);\r
2928             }\r
2929 \r
2930             _Output _Out = _M_pFunc(_OutputVector);\r
2931             return (new message<_Output>(_Out));\r
2932         }\r
2933 \r
2934         /// <summary>\r
2935         ///     Initialize the recalculate block\r
2936         /// </summary>\r
2937         /// <param name="_NumInputs">\r
2938         ///     The number of inputs\r
2939         /// </param>\r
2940         void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)\r
2941         {\r
2942             initialize_source_and_target(_PScheduler, _PScheduleGroup);\r
2943 \r
2944             _M_connectedSources.set_bound(_NumInputs);\r
2945 \r
2946             // Non greedy joins need a buffer to snap off saved message ids to.\r
2947             _M_savedIdBuffer = new runtime_object_identity[_NumInputs];\r
2948             memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);\r
2949         }\r
2950 \r
2951         // An array containing the accepted messages of this join.\r
2952         std::vector<message<_Input>*> _M_messageArray;\r
2953 \r
2954         // An array containing the msg ids of messages propagated to the array\r
2955         // For non-greedy joins, this contains the message id of any message \r
2956         // passed to it.\r
2957         std::vector<runtime_object_identity> _M_savedMessageIdArray;\r
2958 \r
2959         // Buffer for snapping saved ids in non-greedy joins\r
2960         runtime_object_identity * _M_savedIdBuffer;\r
2961 \r
2962         // The transformer method called by this block\r
2963         _Recalculate_method _M_pFunc;\r
2964 \r
2965         // Queue to hold output messages\r
2966         MessageQueue<_Output> _M_messageBuffer;\r
2967     };\r
2968 \r
2969     //\r
2970     // Container class to hold a join_transform block and keep unbounded buffers in front of each input.\r
2971     //\r
2972     template<class _Input, class _Output>\r
2973     class buffered_join\r
2974     {\r
2975         typedef std::tr1::function<_Output(std::vector<_Input> const&)> _Transform_method;\r
2976 \r
2977     public:\r
2978         buffered_join(int _NumInputs, _Transform_method const& _Func): m_currentInput(0), m_numInputs(_NumInputs)\r
2979         {\r
2980             m_buffers = new unbounded_buffer<_Input>*[_NumInputs];\r
2981             m_join = new join_transform<_Input,_Output,greedy>(_NumInputs, _Func);\r
2982 \r
2983             for(int i = 0; i < _NumInputs; i++)\r
2984             {\r
2985                 m_buffers[i] = new unbounded_buffer<_Input>;\r
2986                 m_buffers[i]->link_target(m_join);\r
2987             }\r
2988         }\r
2989 \r
2990         ~buffered_join()\r
2991         {\r
2992             for(int i = 0; i < m_numInputs; i++)\r
2993                 delete m_buffers[i];\r
2994             delete [] m_buffers;\r
2995             delete m_join;\r
2996         }\r
2997 \r
2998         // Add input takes _PSource and connects it to the next input on this block\r
2999         void add_input(ISource<_Input> * _PSource)\r
3000         {\r
3001             _PSource->link_target(m_buffers[m_currentInput]);\r
3002             m_currentInput++;\r
3003         }\r
3004 \r
3005         // link_target links this container block to _PTarget\r
3006         void link_target(ITarget<_Output> * _PTarget)\r
3007         {\r
3008             m_join->link_target(_PTarget);\r
3009         }\r
3010     private:\r
3011 \r
3012         int m_currentInput;\r
3013         int m_numInputs;\r
3014         unbounded_buffer<_Input> ** m_buffers;\r
3015         join_transform<_Input,_Output,greedy> * m_join;\r
3016     };\r
3017 } // namespace Concurrency\r