]> git.sesse.net Git - casparcg/blob - tbb/include/tbb/graph.h
2.0. Updated tbb library.
[casparcg] / tbb / include / tbb / graph.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_graph_H
30 #define __TBB_graph_H
31
32 #if !TBB_PREVIEW_GRAPH
33 #error Set TBB_PREVIEW_GRAPH to include graph.h
34 #endif
35
36 #include "tbb_stddef.h"
37 #include "atomic.h"
38 #include "spin_mutex.h"
39 #include "null_mutex.h"
40 #include "spin_rw_mutex.h"
41 #include "null_rw_mutex.h"
42 #include "task.h"
43 #include "concurrent_vector.h"
44 #include "_aggregator_internal.h"
45
46 // use the VC10 or gcc version of tuple if it is available.
47 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
48 #define TBB_PREVIEW_TUPLE 1
49 #include "compat/tuple"
50 #else
51 #include <tuple>
52 #endif
53
54 #include<list>
55 #include<queue>
56
57 /** @file
58   \brief The graph related classes and functions
59
60   There are some applications that best express dependencies as messages
61   passed between nodes in a graph.  These messages may contain data or
62   simply act as signals that a predecessors has completed. The graph
63   class and its associated node classes can be used to express such
64   applcations.
65 */
66
67 namespace tbb {
68
69     //! The base of all graph nodes.  Allows them to be stored in a collection for deletion.
70     class graph_node {
71     public:
72         virtual ~graph_node() {} 
73     }; 
74
75     //! An empty class used for messages that mean "I'm done" 
76     class continue_msg {};
77
78     template< typename T > class sender;
79     template< typename T > class receiver;
80     class continue_receiver;
81
82     //! Pure virtual template class that defines a sender of messages of type T
83     template< typename T >
84     class sender {
85     public:
86         //! The output type of this sender
87         typedef T output_type;
88
89         //! The successor type for this node
90         typedef receiver<T> successor_type;
91
92         virtual ~sender() {}
93
94         //! Add a new successor to this node
95         virtual bool register_successor( successor_type &r ) = 0;
96
97         //! Removes a successor from this node
98         virtual bool remove_successor( successor_type &r ) = 0;
99
100         //! Request an item from the sender
101         virtual bool try_get( T & ) { return false; }
102
103         //! Reserves an item in the sender 
104         virtual bool try_reserve( T & ) { return false; }
105
106         //! Releases the reserved item
107         virtual bool try_release( ) { return false; }
108
109         //! Consumes the reserved item
110         virtual bool try_consume( ) { return false; }
111
112     };
113
114
115     //! Pure virtual template class that defines a receiver of messages of type T
116     template< typename T >
117     class receiver {
118     public:
119
120         //! The input type of this receiver
121         typedef T input_type;
122
123         //! The predecessor type for this node
124         typedef sender<T> predecessor_type;
125
126         //! Destructor
127         virtual ~receiver() {}
128
129         //! Put an item to the receiver
130         virtual bool try_put( T t ) = 0;
131
132         //! Add a predecessor to the node
133         virtual bool register_predecessor( predecessor_type & ) { return false; }
134
135         //! Remove a predecessor from the node
136         virtual bool remove_predecessor( predecessor_type & ) { return false; }
137
138     };
139
140     //! Base class for receivers of completion messages
141     /** These receivers automatically reset, but cannot be explicitly waited on */
142     class continue_receiver : public receiver< continue_msg > {
143     public:
144
145         //! The input type
146         typedef continue_msg input_type;
147
148         //! The predecessor type for this node
149         typedef sender< continue_msg > predecessor_type;
150
151         //! Constructor
152         continue_receiver( int number_of_predecessors = 0 ) { 
153             my_predecessor_count = number_of_predecessors;
154             my_current_count = 0;
155         }
156
157         //! Destructor
158         virtual ~continue_receiver() { }
159
160         //! Increments the trigger threshold
161         /* override */ bool register_predecessor( predecessor_type & ) {
162             spin_mutex::scoped_lock l(my_mutex);
163             ++my_predecessor_count;
164             return true;
165         }
166
167         //! Decrements the trigger threshold
168         /** Does not check to see if the removal of the predecessor now makes the current count
169             exceed the new threshold.  So removing a predecessor while the graph is active can cause
170             unexpected results. */
171         /* override */ bool remove_predecessor( predecessor_type & ) {
172             spin_mutex::scoped_lock l(my_mutex);
173             --my_predecessor_count;
174             return true;
175         }
176
177         //! Puts a continue_msg to the receiver
178         /** If the message causes the message count to reach the predecessor count, execute() is called and
179             the message count is reset to 0.  Otherwise the message count is incremented. */
180         /* override */ bool try_put( input_type ) {
181             {
182                 spin_mutex::scoped_lock l(my_mutex);
183                 if ( ++my_current_count < my_predecessor_count ) 
184                     return true;
185                 else
186                     my_current_count = 0;
187             }
188             execute();
189             return true;
190         }
191
192     protected:
193
194         spin_mutex my_mutex;
195         int my_predecessor_count;
196         int my_current_count;
197
198         //! Does whatever should happen when the threshold is reached
199         /** This should be very fast or else spawn a task.  This is
200             called while the sender is blocked in the try_put(). */
201         virtual void execute() = 0;
202
203     };
204
205     //! @cond INTERNAL
206     namespace internal {
207
208         //! The state of an executable node
209         enum node_state { node_state_idle=0, node_state_nonidle=1, node_state_inactive=2 };
210
211
212         //! A functor that takes no input and generates a value of type Output
213         template< typename Output >
214         class source_body : no_assign   {
215         public:
216             virtual ~source_body() {}
217             virtual bool operator()(Output &output) = 0;
218         };
219
220         //! The leaf for source_body
221         template< typename Output, typename Body>
222         class source_body_leaf : public source_body<Output> {
223         public:
224             source_body_leaf( Body _body ) : body(_body) { }
225             /*override */ bool operator()(Output &output) { return body( output ); }
226         private:
227             Body body;
228         };
229
230         //! A functor that takes an Input and generates an Output
231         template< typename Input, typename Output >
232             class function_body : no_assign {
233         public:
234             virtual ~function_body() {}
235             virtual Output operator()(Input input) = 0;
236         };
237
238         //! the leaf for function_body
239         template <typename Input, typename Output, typename B>
240         class function_body_leaf : public function_body< Input, Output > {
241         public:
242             function_body_leaf( B _body ) : body(_body) { }
243             Output operator()(Input i) { return body(i); }
244
245         private:
246             B body;
247         };
248
249         //! the leaf for function_body specialized for Input and output of continue_msg
250         template <typename B>
251         class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
252         public:
253             function_body_leaf( B _body ) : body(_body) { }
254             continue_msg operator()( continue_msg i ) { 
255                 body(i); 
256                 return i; 
257             }
258
259         private:
260             B body;
261         };
262
263         //! the leaf for function_body specialized for Output of continue_msg
264         template <typename Input, typename B>
265         class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
266         public:
267             function_body_leaf( B _body ) : body(_body) { }
268             continue_msg operator()(Input i) { 
269                 body(i); 
270                 return continue_msg();
271             }
272
273         private:
274             B body;
275         };
276
277         //! the leaf for function_body specialized for Input of continue_msg
278         template <typename Output, typename B>
279         class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
280         public:
281             function_body_leaf( B _body ) : body(_body) { }
282             Output operator()(continue_msg i) { 
283                 return body(i); 
284             }
285
286         private:
287             B body;
288         };
289
290         //! A task that calls a node's forward function
291         template< typename NodeType >
292         class forward_task : public task {
293
294             NodeType &my_node;
295
296         public:
297
298             forward_task( NodeType &n ) : my_node(n) {}
299
300             task *execute() {
301                 my_node.forward();
302                 return NULL;
303             }
304         };
305
306         //! A task that calls a node's apply_body function, passing in an input of type Input
307         template< typename NodeType, typename Input >
308         class apply_body_task : public task {
309
310             NodeType &my_node;
311             Input my_input;
312
313         public:
314
315             apply_body_task( NodeType &n, Input i ) : my_node(n), my_input(i) {}
316
317             task *execute() {
318                 my_node.apply_body( my_input );
319                 return NULL;
320             }
321         };
322
323         //! A task that calls a node's apply_body function with no input
324         template< typename NodeType >
325         class source_task : public task {
326
327             NodeType &my_node;
328
329         public:
330
331             source_task( NodeType &n ) : my_node(n) {}
332
333             task *execute() {
334                 my_node.apply_body( );
335                 return NULL;
336             }
337         };
338
339         //! An empty functor that takes an Input and returns a default constructed Output
340         template< typename Input, typename Output >
341         struct empty_body {
342            Output operator()( const Input & ) const { return Output(); } 
343         };
344
345         //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock. 
346         template< typename T, typename M=spin_mutex >
347         class node_cache {
348             public:
349
350             typedef size_t size_type;
351
352             bool empty() {
353                 typename my_mutex_type::scoped_lock lock( my_mutex );
354                 return internal_empty();
355             }
356
357             void add( T &n ) {
358                 typename my_mutex_type::scoped_lock lock( my_mutex );
359                 internal_push(n);
360             }
361
362             void remove( T &n ) {
363                 typename my_mutex_type::scoped_lock lock( my_mutex );
364                 for ( size_t i = internal_size(); i != 0; --i ) {
365                     T &s = internal_pop();
366                     if ( &s != &n ) {
367                         internal_push(s);
368                     }
369                 }
370             }
371
372         protected:
373
374             typedef M my_mutex_type;
375             my_mutex_type my_mutex;
376             std::queue< T * > my_q;
377
378             // Assumes lock is held
379             inline bool internal_empty( )  {
380                 return my_q.empty();
381             }
382
383             // Assumes lock is held
384             inline size_type internal_size( )  {
385                 return my_q.size(); 
386             }
387
388             // Assumes lock is held
389             inline void internal_push( T &n )  {
390                 my_q.push(&n);
391             }
392
393             // Assumes lock is held
394             inline T &internal_pop() {
395                 T *v = my_q.front();
396                 my_q.pop();
397                 return *v;
398             }
399
400         };
401
402         //! A cache of predecessors that only supports try_get
403         template< typename T, typename M=spin_mutex >
404         class predecessor_cache : public node_cache< sender<T>, M > {
405             public:
406             typedef M my_mutex_type;
407             typedef T output_type; 
408             typedef sender<output_type> predecessor_type;
409             typedef receiver<output_type> successor_type;
410
411             predecessor_cache( ) : my_owner( NULL ) { }
412
413             void set_owner( successor_type *owner ) { my_owner = owner; }
414
415             bool get_item( output_type &v ) {
416
417                 bool msg = false;
418
419                 do {
420                     predecessor_type *src;
421                     {
422                         typename my_mutex_type::scoped_lock lock(this->my_mutex);
423                         if ( this->internal_empty() ) {
424                             break;
425                         }
426                         src = &this->internal_pop();
427                     }
428
429                     // Try to get from this sender
430                     msg = src->try_get( v );
431
432                     if (msg == false) {
433                         // Relinquish ownership of the edge
434                         if ( my_owner) 
435                             src->register_successor( *my_owner );
436                     } else {
437                         // Retain ownership of the edge
438                         this->add(*src);
439                     }
440                 } while ( msg == false );
441                 return msg;
442             }
443
444         protected:
445             successor_type *my_owner;
446         };
447
448         //! An cache of predecessors that supports requests and reservations
449         template< typename T, typename M=spin_mutex >
450         class reservable_predecessor_cache : public predecessor_cache< T, M > {
451         public:
452             typedef M my_mutex_type;
453             typedef T output_type; 
454             typedef sender<T> predecessor_type;
455             typedef receiver<T> successor_type;
456
457             reservable_predecessor_cache( ) : reserved_src(NULL) { }
458
459             bool 
460             try_reserve( output_type &v ) {
461                 bool msg = false;
462
463                 do {
464                     {
465                         typename my_mutex_type::scoped_lock lock(this->my_mutex);
466                         if ( reserved_src || this->internal_empty() ) 
467                             return false;
468
469                         reserved_src = &this->internal_pop();
470                     }
471
472                     // Try to get from this sender
473                     msg = reserved_src->try_reserve( v );
474
475                     if (msg == false) {
476                         typename my_mutex_type::scoped_lock lock(this->my_mutex);
477                         // Relinquish ownership of the edge
478                         reserved_src->register_successor( *this->my_owner );
479                         reserved_src = NULL;
480                     } else {
481                         // Retain ownership of the edge
482                         this->add( *reserved_src );
483                     }
484                 } while ( msg == false );
485
486                 return msg;
487             }
488
489             bool 
490             try_release( ) {
491                 reserved_src->try_release( );
492                 reserved_src = NULL;
493                 return true;
494             }
495
496             bool 
497             try_consume( ) {
498                 reserved_src->try_consume( );
499                 reserved_src = NULL;
500                 return true;
501             }
502
503         private:
504             predecessor_type *reserved_src;
505         };
506
507
508         //! An abstract cache of succesors
509         template<typename T, typename M=spin_rw_mutex >
510         class successor_cache : no_copy {
511         protected:
512
513             typedef M my_mutex_type;
514             my_mutex_type my_mutex;
515
516             typedef std::list< receiver<T> * > my_successors_type;
517             my_successors_type my_successors;
518
519             sender<T> *my_owner;
520
521         public:
522
523             successor_cache( ) : my_owner(NULL) {}
524
525             void set_owner( sender<T> *owner ) { my_owner = owner; }
526
527             virtual ~successor_cache() {}
528
529             void register_successor( receiver<T> &r ) {
530                 typename my_mutex_type::scoped_lock l(my_mutex, true);
531                 my_successors.push_back( &r ); 
532             }
533
534             void remove_successor( receiver<T> &r ) {
535                 typename my_mutex_type::scoped_lock l(my_mutex, true);
536                 for ( typename my_successors_type::iterator i = my_successors.begin();
537                       i != my_successors.end(); ++i ) { 
538                     if ( *i == & r ) { 
539                         my_successors.erase(i);
540                         break;
541                     }
542                 }
543             }
544
545             bool empty() { 
546                 typename my_mutex_type::scoped_lock l(my_mutex, false);
547                 return my_successors.empty(); 
548             }
549
550             virtual bool try_put( T t ) = 0; 
551          };
552
553         //! An abstract cache of succesors, specialized to continue_msg
554         template<>
555         class successor_cache< continue_msg > : no_copy {
556         protected:
557
558             typedef spin_rw_mutex my_mutex_type;
559             my_mutex_type my_mutex;
560
561             typedef std::list< receiver<continue_msg> * > my_successors_type;
562             my_successors_type my_successors;
563
564             sender<continue_msg> *my_owner;
565
566         public:
567
568             successor_cache( ) : my_owner(NULL) {}
569
570             void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
571
572             virtual ~successor_cache() {}
573
574             void register_successor( receiver<continue_msg> &r ) {
575                 my_mutex_type::scoped_lock l(my_mutex, true);
576                 my_successors.push_back( &r ); 
577                 if ( my_owner )
578                     r.register_predecessor( *my_owner );
579             }
580
581             void remove_successor( receiver<continue_msg> &r ) {
582                 my_mutex_type::scoped_lock l(my_mutex, true);
583                 for ( my_successors_type::iterator i = my_successors.begin();
584                       i != my_successors.end(); ++i ) { 
585                     if ( *i == & r ) { 
586                         if ( my_owner )
587                             r.remove_predecessor( *my_owner );
588                         my_successors.erase(i);
589                         break;
590                     }
591                 }
592             }
593
594             bool empty() { 
595                 my_mutex_type::scoped_lock l(my_mutex, false);
596                 return my_successors.empty(); 
597             }
598
599             virtual bool try_put( continue_msg t ) = 0; 
600
601          };
602
603         //! A cache of successors that are broadcast to
604         template<typename T, typename M=spin_rw_mutex>
605         class broadcast_cache : public successor_cache<T, M> {
606             typedef M my_mutex_type;
607             typedef std::list< receiver<T> * > my_successors_type;
608
609         public:
610
611             broadcast_cache( ) {}
612
613             bool try_put( T t ) {
614                 bool msg = false;
615                 bool upgraded = false;
616                 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
617                 typename my_successors_type::iterator i = this->my_successors.begin();
618                 while ( i != this->my_successors.end() ) {
619                    if ( (*i)->try_put( t ) == true ) {
620                        ++i;
621                        msg = true;
622                    } else {
623                       if ( (*i)->register_predecessor(*this->my_owner) ) {
624                           if (!upgraded) {
625                               l.upgrade_to_writer();
626                               upgraded = true;
627                           }
628                           i = this->my_successors.erase(i);
629                       }
630                       else {
631                           ++i;
632                       }
633                    }
634                 }
635                 return msg;
636             }
637         };
638
639         //! A cache of successors that are put in a round-robin fashion
640         template<typename T, typename M=spin_rw_mutex >
641         class round_robin_cache : public successor_cache<T, M> {
642             typedef size_t size_type;
643             typedef M my_mutex_type;
644             typedef std::list< receiver<T> * > my_successors_type;
645
646         public:
647
648             round_robin_cache( ) {}
649
650             size_type size() {
651                 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
652                 return this->my_successors.size();
653             }
654
655             bool try_put( T t ) {
656                 bool upgraded = false;
657                 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
658                 typename my_successors_type::iterator i = this->my_successors.begin();
659                 while ( i != this->my_successors.end() ) {
660                    if ( (*i)->try_put( t ) ) {
661                        return true;
662                    } else {
663                       if ( (*i)->register_predecessor(*this->my_owner) ) {
664                           if (!upgraded) {
665                               l.upgrade_to_writer();
666                               upgraded = true;
667                           }
668                           i = this->my_successors.erase(i);
669                       }
670                       else {
671                           ++i;
672                       }
673                    }
674                 }
675                 return false;
676             }
677         };
678
679         template<typename T>
680         class decrementer : public continue_receiver, internal::no_copy {
681
682             T *my_node;
683
684             void execute() {
685                 my_node->decrement_counter();
686             }
687
688         public:
689            
690             typedef continue_msg input_type;
691             typedef continue_msg output_type;
692             decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
693             void set_owner( T *node ) { my_node = node; }
694         };
695
696     }
697     //! @endcond INTERNAL
698
699
700     //! The graph class
701     /** This class serves as a handle to the graph */
702     class graph : internal::no_copy {
703
704         template< typename Body >
705         class run_task : public task {
706         public: 
707             run_task( Body& body ) : my_body(body) {}
708             task *execute() {
709                 my_body();
710                 return NULL;
711             }
712         private:
713             Body my_body;
714         };
715
716         template< typename Receiver, typename Body >
717         class run_and_put_task : public task {
718         public: 
719             run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
720             task *execute() {
721                 my_receiver.try_put( my_body() );
722                 return NULL;
723             }
724         private:
725             Receiver &my_receiver;
726             Body my_body;
727         };
728
729     public:
730
731         //! An enumeration the provides the two most common concurrency levels: unlimited and serial
732         enum concurrency { unlimited = 0, serial = 1 };
733
734         //! Constructs a graph withy no nodes.
735         graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
736             my_root_task->set_ref_count(1);
737         }
738
739         //! Destroys the graph.
740         /** Calls wait_for_all on the graph, deletes all of the nodes appended by calls to add, and then 
741             destroys the root task of the graph. */ 
742         ~graph() {
743             wait_for_all();
744             my_root_task->set_ref_count(0);
745             task::destroy( *my_root_task );
746         }
747
748
749         //! Used to register that an external entity may still interact with the graph.
750         /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
751             is made. */
752         void increment_wait_count() { 
753             if (my_root_task)
754                 my_root_task->increment_ref_count();
755         }
756
757         //! Deregisters an external entity that may have interacted with the graph.
758         /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
759             matches the number of increment_wait_count calls. */
760         void decrement_wait_count() { 
761             if (my_root_task)
762                 my_root_task->decrement_ref_count(); 
763         }
764
765         //! Spawns a task that runs a body and puts its output to a specific receiver
766         /** The task is spawned as a child of the graph. This is useful for running tasks 
767             that need to block a wait_for_all() on the graph.  For example a one-off source. */
768         template< typename Receiver, typename Body >
769             void run( Receiver &r, Body body ) {
770            task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
771                run_and_put_task< Receiver, Body >( r, body ) );
772         }
773
774         //! Spawns a task that runs a function object 
775         /** The task is spawned as a child of the graph. This is useful for running tasks 
776             that need to block a wait_for_all() on the graph. For example a one-off source. */
777         template< typename Body >
778         void run( Body body ) {
779            task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
780                run_task< Body >( body ) );
781         }
782
783         //! Waits until the graph is idle and the number of decrement_wait_count calls equals the number of increment_wait_count calls.
784         /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
785         void wait_for_all() {
786             if (my_root_task)
787                 my_root_task->wait_for_all();
788             my_root_task->set_ref_count(1);
789         }
790
791         //! Returns the root task of the graph
792         task * root_task() {
793             return my_root_task;
794         }
795
796     private:
797
798         task *my_root_task;
799
800     };
801
802
803     //! @cond INTERNAL
804     namespace internal {
805
806         //! Implements methods for a function node that takes a type T as input
807         template< typename Input, typename Output >
808         class function_input : public receiver<Input>, no_assign {
809             typedef sender<Input> predecessor_type;
810             enum op_stat {WAIT=0, SUCCEEDED, FAILED};
811             enum op_type {reg_pred, rem_pred, app_body, tryput, try_fwd};
812             typedef function_input<Input, Output> my_class;
813
814         public:
815             //! The input type of this receiver
816             typedef Input input_type;
817             //! The output type of this receiver
818             typedef Output output_type;
819
820             //! Constructor for function_input
821             template< typename Body >
822             function_input( graph &g, size_t max_concurrency, Body& body )
823                 : my_root_task(g.root_task()), my_max_concurrency(max_concurrency), my_concurrency(internal::node_state_idle),
824                   my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
825                 forwarder_busy(false) {
826                 my_predecessors.set_owner(this);
827                 my_aggregator.initialize_handler(my_handler(this));
828             }
829
830             //! Destructor
831             virtual ~function_input() { delete my_body; }
832
833             //! Put to the node
834             virtual bool try_put( input_type t ) {
835                if ( my_max_concurrency == 0 ) {
836                    spawn_body_task( t );
837                    return true;
838                } else {
839                    my_operation op_data(t, tryput);
840                    my_aggregator.execute(&op_data);
841                    return op_data.status == SUCCEEDED;
842                }
843             }
844
845             //! Adds src to the list of cached predecessors.
846             /* override */ bool register_predecessor( predecessor_type &src ) {
847                 my_operation op_data(reg_pred);
848                 op_data.r = &src;
849                 my_aggregator.execute(&op_data);
850                 return true;
851             }
852
853             //! Removes src from the list of cached predecessors.
854             /* override */ bool remove_predecessor( predecessor_type &src ) {
855                 my_operation op_data(rem_pred);
856                 op_data.r = &src;
857                 my_aggregator.execute(&op_data);
858                 return true;
859             }
860
861         protected:
862             task *my_root_task;
863             const size_t my_max_concurrency;
864             size_t my_concurrency;
865             function_body<input_type, output_type> *my_body;
866             predecessor_cache<input_type, null_mutex > my_predecessors;
867
868             virtual broadcast_cache<output_type > &successors() = 0;
869
870         private:
871             friend class apply_body_task< function_input< input_type, output_type >, input_type >;
872             friend class forward_task< function_input< input_type, output_type > >;
873
874             class my_operation : public aggregated_operation< my_operation > {
875             public:
876                 char type;
877                 union {
878                     input_type *elem;
879                     predecessor_type *r;
880                 };
881                 my_operation(const input_type& e, op_type t) :
882                     type(char(t)), elem(const_cast<input_type*>(&e)) {}
883                 my_operation(op_type t) : type(char(t)), r(NULL) {}
884             };
885
886             bool forwarder_busy;
887             typedef internal::aggregating_functor<my_class, my_operation> my_handler;
888             friend class internal::aggregating_functor<my_class, my_operation>;
889             aggregator< my_handler, my_operation > my_aggregator;
890
891             void handle_operations(my_operation *op_list) {
892                 my_operation *tmp;
893                 while (op_list) {
894                     tmp = op_list;
895                     op_list = op_list->next;
896                     switch (tmp->type) {
897                     case reg_pred:
898                         my_predecessors.add(*(tmp->r));
899                         __TBB_store_with_release(tmp->status, SUCCEEDED);
900                         if (!forwarder_busy) {
901                             forwarder_busy = true;
902                             spawn_forward_task();
903                         }
904                         break;
905                     case rem_pred:
906                         my_predecessors.remove(*(tmp->r));
907                         __TBB_store_with_release(tmp->status, SUCCEEDED);
908                         break;
909                     case app_body:
910                         __TBB_ASSERT(my_max_concurrency != 0, NULL);
911                         --my_concurrency;
912                         __TBB_store_with_release(tmp->status, SUCCEEDED);
913                         if (my_concurrency<my_max_concurrency) {
914                             input_type i;
915                             if (my_predecessors.get_item(i)) {
916                                 ++my_concurrency;
917                                 spawn_body_task(i);
918                             }
919                         }
920                         break;
921                     case tryput: internal_try_put(tmp);  break;
922                     case try_fwd: internal_forward(tmp);  break;
923                     }
924                 }
925             }
926
927             //! Put to the node
928             void internal_try_put(my_operation *op) {
929                 __TBB_ASSERT(my_max_concurrency != 0, NULL);
930                 if (my_concurrency < my_max_concurrency) {
931                    ++my_concurrency;
932                    spawn_body_task(*(op->elem));
933                    __TBB_store_with_release(op->status, SUCCEEDED);
934                } else {
935                    __TBB_store_with_release(op->status, FAILED);
936                }
937             }
938
939             //! Tries to spawn bodies if available and if concurrency allows
940             void internal_forward(my_operation *op) {
941                 if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
942                     input_type i;
943                     if (my_predecessors.get_item(i)) {
944                         ++my_concurrency;
945                         __TBB_store_with_release(op->status, SUCCEEDED);
946                         spawn_body_task(i);
947                         return;
948                     }
949                 }
950                 __TBB_store_with_release(op->status, FAILED);
951                 forwarder_busy = false;
952             }
953
954             //! Applies the body to the provided input
955             void apply_body( input_type &i ) {
956                 successors().try_put( (*my_body)(i) );
957                 if ( my_max_concurrency != 0 ) {
958                     my_operation op_data(app_body);
959                     my_aggregator.execute(&op_data);
960                 }
961             }
962
963            //! Spawns a task that calls apply_body( input )
964            inline void spawn_body_task( input_type &input ) {
965                task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) apply_body_task<function_input<input_type, output_type>, input_type >(*this, input));
966            }
967
968            //! This is executed by an enqueued task, the "forwarder"
969            void forward() {
970                my_operation op_data(try_fwd);
971                do {
972                    op_data.status = WAIT;
973                    my_aggregator.execute(&op_data);
974                } while (op_data.status == SUCCEEDED);
975            }
976
977            //! Spawns a task that calls forward()
978            inline void spawn_forward_task() {
979                task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) forward_task<function_input<input_type, output_type> >(*this));
980            }
981         };
982
983         //! Implements methods for an executable node that takes continue_msg as input
984         template< typename Output >
985         class continue_input : public continue_receiver {
986         public:
987
988             //! The input type of this receiver
989             typedef continue_msg input_type;
990     
991             //! The output type of this receiver
992             typedef Output output_type;
993
994             template< typename Body >
995             continue_input( graph &g, Body& body )
996                 : my_root_task(g.root_task()), 
997                  my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
998
999             template< typename Body >
1000             continue_input( graph &g, int number_of_predecessors, Body& body )
1001                 : continue_receiver( number_of_predecessors ), my_root_task(g.root_task()), 
1002                  my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
1003
1004         protected:
1005
1006             task *my_root_task;
1007             function_body<input_type, output_type> *my_body;
1008
1009             virtual broadcast_cache<output_type > &successors() = 0; 
1010
1011             friend class apply_body_task< continue_input< Output >, continue_msg >;
1012
1013             //! Applies the body to the provided input
1014             /* override */ void apply_body( input_type ) {
1015                 successors().try_put( (*my_body)( continue_msg() ) );
1016             }
1017
1018             //! Spawns a task that applies the body
1019             /* override */ void execute( ) {
1020                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
1021                    apply_body_task< continue_input< Output >, continue_msg >( *this, continue_msg() ) ); 
1022             }
1023         };
1024
1025         //! Implements methods for both executable and function nodes that puts Output to its successors
1026         template< typename Output >
1027         class function_output : public sender<Output> {
1028         public:
1029
1030             typedef Output output_type;
1031
1032             function_output() { }
1033
1034             //! Adds a new successor to this node
1035             /* override */ bool register_successor( receiver<output_type> &r ) {
1036                 successors().register_successor( r );
1037                 return true;
1038             }
1039
1040             //! Removes a successor from this node
1041             /* override */ bool remove_successor( receiver<output_type> &r ) {
1042                 successors().remove_successor( r );
1043                 return true;
1044             }
1045   
1046         protected:
1047
1048             virtual broadcast_cache<output_type > &successors() = 0; 
1049
1050         };
1051
1052     }
1053     //! @endcond INTERNAL
1054
1055     //! An executable node that acts as a source, i.e. it has no predecessors
1056     template < typename Output >
1057     class source_node : public graph_node, public sender< Output > {
1058     public:
1059
1060         //! The type of the output message, which is complete
1061         typedef Output output_type;           
1062
1063         //! The type of successors of this node
1064         typedef receiver< Output > successor_type;
1065
1066         //! Constructor for a node with a successor
1067         template< typename Body >
1068         source_node( graph &g, Body body, bool is_active = true )
1069              : my_root_task(g.root_task()), my_state( is_active ? internal::node_state_idle : internal::node_state_inactive ),
1070               my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1071               my_reserved(false), my_has_cached_item(false) { 
1072             my_successors.set_owner(this);
1073         }
1074
1075         //! The destructor
1076         ~source_node() { delete my_body; }
1077
1078         //! Add a new successor to this node
1079         /* override */ bool register_successor( receiver<output_type> &r ) {
1080             spin_mutex::scoped_lock lock(my_mutex);
1081             my_successors.register_successor(r);
1082             if ( my_state != internal::node_state_inactive )
1083                 spawn_put();
1084             return true;
1085         }
1086
1087         //! Removes a successor from this node
1088         /* override */ bool remove_successor( receiver<output_type> &r ) {
1089             spin_mutex::scoped_lock lock(my_mutex);
1090             my_successors.remove_successor(r);
1091             return true;
1092         }
1093
1094         //! Request an item from the node
1095         /*override */ bool try_get( output_type &v ) {
1096             spin_mutex::scoped_lock lock(my_mutex);
1097             if ( my_reserved )  
1098                 return false;
1099
1100             if ( my_has_cached_item ) {
1101                 v = my_cached_item;
1102                 my_has_cached_item = false;
1103             } else if ( (*my_body)(v) == false ) {
1104                 return false;
1105             }
1106             return true;
1107         }
1108
1109         //! Reserves an item.
1110         /* override */ bool try_reserve( output_type &v ) {
1111             spin_mutex::scoped_lock lock(my_mutex);
1112             if ( my_reserved ) {
1113                 return false;
1114             }
1115
1116             if ( !my_has_cached_item && (*my_body)(my_cached_item) )  
1117                 my_has_cached_item = true;
1118
1119             if ( my_has_cached_item ) {
1120                 v = my_cached_item;
1121                 my_reserved = true;
1122                 return true;
1123             } else {
1124                 return false;
1125             }
1126         }
1127
1128         //! Release a reserved item.  
1129         /**  true = item has been released and so remains in sender, dest must request or reserve future items */
1130         /* override */ bool try_release( ) {
1131             spin_mutex::scoped_lock lock(my_mutex);
1132             __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1133             my_reserved = false;
1134             spawn_put();
1135             return true;
1136         }
1137
1138         //! Consumes a reserved item
1139         /* override */ bool try_consume( ) {
1140             spin_mutex::scoped_lock lock(my_mutex);
1141             __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1142             my_reserved = false;
1143             my_has_cached_item = false;
1144             if ( !my_successors.empty() ) {
1145                 spawn_put();
1146             }
1147             return true;
1148         }
1149
1150         //! Activates a node that was created in the inactive state
1151         void activate() {
1152             spin_mutex::scoped_lock lock(my_mutex);
1153             my_state = internal::node_state_idle;
1154             if ( !my_successors.empty() )
1155                 spawn_put();
1156         }
1157
1158     private:
1159
1160         task *my_root_task;
1161         spin_mutex my_mutex;
1162         internal::node_state my_state;
1163         internal::source_body<output_type> *my_body;
1164         internal::broadcast_cache< output_type > my_successors;
1165         bool my_reserved;
1166         bool my_has_cached_item;
1167         output_type my_cached_item;
1168
1169         friend class internal::source_task< source_node< output_type > >;
1170
1171         //! Applies the body
1172         /* override */ void apply_body( ) {
1173             output_type v;
1174             if ( try_reserve(v) == false )
1175                 return;
1176
1177             if ( my_successors.try_put( v ) ) 
1178                 try_consume();
1179             else
1180                 try_release();
1181         }
1182
1183         //! Spawns a task that applies the body
1184         /* override */ void spawn_put( ) {
1185             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
1186                internal::source_task< source_node< output_type > >( *this ) ); 
1187         }
1188
1189     };
1190
1191     //! Implements a function node that supports Input -> Output
1192     template <typename Input, typename Output = continue_msg >
1193     class function_node : public graph_node, public internal::function_input<Input,Output>, public internal::function_output<Output> {
1194     public:
1195
1196         typedef Input input_type;
1197         typedef Output output_type;
1198         typedef sender< input_type > predecessor_type;
1199         typedef receiver< output_type > successor_type;
1200
1201         //! Constructor
1202         template< typename Body >
1203         function_node( graph &g, size_t concurrency, Body body )
1204         : internal::function_input<input_type,output_type>( g, concurrency, body ) {
1205             my_successors.set_owner(this);
1206         }
1207
1208     protected:
1209
1210         internal::broadcast_cache<output_type> my_successors; 
1211         /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
1212
1213     };
1214
1215     //! Implements an executable node that supports continue_msg -> Output
1216     template <typename Output>
1217     class executable_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
1218     public:
1219
1220         typedef continue_msg input_type;
1221         typedef Output output_type;
1222         typedef sender< input_type > predecessor_type;
1223         typedef receiver< output_type > successor_type;
1224
1225          //! Constructor for executable node with continue_msg -> Output
1226          template <typename Body >
1227          executable_node( graph &g, Body body )
1228                  : internal::continue_input<output_type>( g, body ) {
1229              my_successors.set_owner(this);
1230          }
1231
1232          //! Constructor for executable node with continue_msg -> Output
1233          template <typename Body >
1234          executable_node( graph &g, int number_of_predecessors, Body body )
1235                  : internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1236              my_successors.set_owner(this);
1237          }
1238
1239     protected:
1240
1241         internal::broadcast_cache<output_type> my_successors; 
1242         /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
1243
1244     };
1245
1246
1247
1248     template< typename T >
1249     class overwrite_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
1250     public:
1251
1252         typedef T input_type;
1253         typedef T output_type;
1254         typedef sender< input_type > predecessor_type;
1255         typedef receiver< output_type > successor_type;
1256
1257         overwrite_node() : my_buffer_is_valid(false) {
1258             my_successors.set_owner( this );
1259         }
1260
1261         ~overwrite_node() {}
1262
1263         /* override */ bool register_successor( successor_type &s ) {
1264             spin_mutex::scoped_lock l( my_mutex );
1265             if ( my_buffer_is_valid ) {
1266                 // We have a valid value that must be forwarded immediately.
1267                 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
1268                     // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
1269                     my_successors.register_successor( s );
1270                     return true;
1271                 } else {
1272                     // We don't add the successor: it rejected our put and we became its predecessor instead
1273                     return false;
1274                 }
1275             } else {
1276                 // No valid value yet, just add as successor
1277                 my_successors.register_successor( s );
1278                 return true;
1279             }
1280         }
1281
1282         /* override */ bool remove_successor( successor_type &s ) {
1283             spin_mutex::scoped_lock l( my_mutex );
1284             my_successors.remove_successor(s);
1285             return true;
1286         }
1287
1288         /* override */ bool try_put( T v ) {
1289             spin_mutex::scoped_lock l( my_mutex );
1290             my_buffer = v;
1291             my_buffer_is_valid = true;
1292             my_successors.try_put(v);
1293             return true;
1294         }
1295
1296         /* override */ bool try_get( T &v ) {
1297             spin_mutex::scoped_lock l( my_mutex );
1298             if ( my_buffer_is_valid ) {
1299                 v = my_buffer;
1300                 return true;
1301             } else {
1302                 return false;
1303             }
1304         }
1305
1306         bool is_valid() {
1307            spin_mutex::scoped_lock l( my_mutex );
1308            return my_buffer_is_valid;
1309         }
1310
1311         void clear() {
1312            spin_mutex::scoped_lock l( my_mutex );
1313            my_buffer_is_valid = false;
1314         }
1315
1316     protected:
1317
1318         spin_mutex my_mutex;
1319         internal::broadcast_cache< T, null_rw_mutex > my_successors;
1320         T my_buffer;
1321         bool my_buffer_is_valid;
1322
1323     };
1324
1325     template< typename T >
1326     class write_once_node : public overwrite_node<T> {
1327     public:
1328
1329         typedef T input_type;
1330         typedef T output_type;
1331         typedef sender< input_type > predecessor_type;
1332         typedef receiver< output_type > successor_type;
1333
1334         /* override */ bool try_put( T v ) {
1335             spin_mutex::scoped_lock l( this->my_mutex );
1336             if ( this->my_buffer_is_valid ) {
1337                 return false;
1338             } else {
1339                 this->my_buffer = v;
1340                 this->my_buffer_is_valid = true;
1341                 this->my_successors.try_put(v);
1342                 return true;
1343             }
1344         }
1345     };
1346
1347     //! Broadcasts completion message when it receives completion messages from all predecessors. Then resets.
1348     /** Is equivalent to an executable_node< continue_msg > with an empty_body */
1349     class continue_node : public executable_node< continue_msg > { 
1350     public:
1351
1352         typedef continue_msg input_type;
1353         typedef continue_msg output_type;
1354         typedef sender< input_type > predecessor_type;
1355         typedef receiver< output_type > successor_type;
1356
1357         continue_node( graph &g ) : executable_node<continue_msg>( g, internal::empty_body< continue_msg, continue_msg>() ) {}
1358     };
1359
1360     //! Forwards messages of type T to all successors
1361     template <typename T>
1362     class broadcast_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
1363
1364         internal::broadcast_cache<T> my_successors;
1365
1366     public:
1367
1368         typedef T input_type;
1369         typedef T output_type;
1370         typedef sender< input_type > predecessor_type;
1371         typedef receiver< output_type > successor_type;
1372
1373         broadcast_node( ) {
1374            my_successors.set_owner( this ); 
1375         }
1376
1377         //! Adds a successor
1378         virtual bool register_successor( receiver<T> &r ) {
1379             my_successors.register_successor( r );
1380             return true;
1381         }
1382
1383         //! Removes s as a successor
1384         virtual bool remove_successor( receiver<T> &r ) {
1385             my_successors.remove_successor( r );
1386             return true;
1387         }
1388
1389         /* override */ bool try_put( T t ) {
1390             my_successors.try_put(t);
1391             return true;
1392         }
1393
1394     };
1395
1396 #include "_item_buffer.h"
1397
1398     //! Forwards messages in arbitrary order
1399     template <typename T, typename A=cache_aligned_allocator<T> >
1400         class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T>, internal::no_copy {
1401     public:
1402         typedef T input_type;
1403         typedef T output_type;
1404         typedef sender< input_type > predecessor_type;
1405         typedef receiver< output_type > successor_type;
1406         typedef buffer_node<T, A> my_class;
1407     protected:
1408         typedef size_t size_type;
1409         internal::round_robin_cache< T, null_rw_mutex > my_successors;
1410
1411         task *my_parent;
1412
1413         friend class internal::forward_task< buffer_node< T, A > >;
1414
1415         enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
1416         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1417
1418         // implements the aggregator_operation concept
1419         class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1420         public:
1421             char type;
1422             T *elem;
1423             successor_type *r;
1424             buffer_operation(const T& e, op_type t) :
1425                 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
1426             buffer_operation(op_type t) : type(char(t)), r(NULL) {}
1427         };
1428
1429         bool forwarder_busy;
1430         typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
1431         friend class internal::aggregating_functor<my_class, buffer_operation>;
1432         internal::aggregator< my_handler, buffer_operation> my_aggregator;
1433
1434         virtual void handle_operations(buffer_operation *op_list) {
1435             buffer_operation *tmp;
1436             bool try_forwarding=false;
1437             while (op_list) {
1438                 tmp = op_list;
1439                 op_list = op_list->next;
1440                 switch (tmp->type) {
1441                 case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
1442                 case rem_succ: internal_rem_succ(tmp); break;
1443                 case req_item: internal_pop(tmp); break;
1444                 case res_item: internal_reserve(tmp); break;
1445                 case rel_res:  internal_release(tmp);  try_forwarding = true; break;
1446                 case con_res:  internal_consume(tmp);  try_forwarding = true; break;
1447                 case put_item: internal_push(tmp);  try_forwarding = true; break;
1448                 case try_fwd:  internal_forward(tmp); break;
1449                 }
1450             }
1451             if (try_forwarding && !forwarder_busy) {
1452                 forwarder_busy = true;
1453                 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
1454             }
1455         }
1456
1457         //! This is executed by an enqueued task, the "forwarder"
1458         virtual void forward() {
1459             buffer_operation op_data(try_fwd);
1460             do {
1461                 op_data.status = WAIT;
1462                 my_aggregator.execute(&op_data);
1463             } while (op_data.status == SUCCEEDED);
1464         }
1465
1466         //! Register successor
1467         virtual void internal_reg_succ(buffer_operation *op) {
1468             my_successors.register_successor(*(op->r));
1469             __TBB_store_with_release(op->status, SUCCEEDED);
1470         }
1471
1472         //! Remove successor
1473         virtual void internal_rem_succ(buffer_operation *op) {
1474             my_successors.remove_successor(*(op->r));
1475             __TBB_store_with_release(op->status, SUCCEEDED);
1476         }
1477
1478         //! Tries to forward valid items to successors
1479         virtual void internal_forward(buffer_operation *op) {
1480             T i_copy;
1481             bool success = false; // flagged when a successor accepts
1482             size_type counter = my_successors.size();
1483             // Try forwarding, giving each successor a chance
1484             while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
1485                 this->fetch_back(i_copy);
1486                 if( my_successors.try_put(i_copy) ) {
1487                     this->invalidate_back();
1488                     --(this->my_tail);
1489                     success = true; // found an accepting successor
1490                 }
1491                 --counter;
1492             }
1493             if (success && !counter)
1494                 __TBB_store_with_release(op->status, SUCCEEDED);
1495             else {
1496                 __TBB_store_with_release(op->status, FAILED);
1497                 forwarder_busy = false;
1498             }
1499         }
1500
1501         virtual void internal_push(buffer_operation *op) {
1502             this->push_back(*(op->elem));
1503             __TBB_store_with_release(op->status, SUCCEEDED);
1504         }
1505
1506         virtual void internal_pop(buffer_operation *op) {
1507             if(this->pop_back(*(op->elem))) {
1508                 __TBB_store_with_release(op->status, SUCCEEDED);
1509             }
1510             else {
1511                 __TBB_store_with_release(op->status, FAILED);
1512             }
1513         }
1514
1515         virtual void internal_reserve(buffer_operation *op) {
1516             if(this->reserve_front(*(op->elem))) {
1517                 __TBB_store_with_release(op->status, SUCCEEDED);
1518             }
1519             else {
1520                 __TBB_store_with_release(op->status, FAILED);
1521             }
1522         }
1523
1524         virtual void internal_consume(buffer_operation *op) {
1525             this->consume_front();
1526             __TBB_store_with_release(op->status, SUCCEEDED);
1527         }
1528
1529         virtual void internal_release(buffer_operation *op) {
1530             this->release_front();
1531             __TBB_store_with_release(op->status, SUCCEEDED);
1532         }
1533
1534     public:
1535         //! Constructor
1536         buffer_node( graph &g ) : reservable_item_buffer<T>(),
1537             my_parent( g.root_task() ), forwarder_busy(false) {
1538             my_successors.set_owner(this);
1539             my_aggregator.initialize_handler(my_handler(this));
1540         }
1541
1542         virtual ~buffer_node() {}
1543
1544         //
1545         // message sender implementation
1546         //
1547
1548         //! Adds a new successor.
1549         /** Adds successor r to the list of successors; may forward tasks.  */
1550         /* override */ bool register_successor( receiver<output_type> &r ) {
1551             buffer_operation op_data(reg_succ);
1552             op_data.r = &r;
1553             my_aggregator.execute(&op_data);
1554             return true;
1555         }
1556
1557         //! Removes a successor.
1558         /** Removes successor r from the list of successors.
1559             It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1560         /* override */ bool remove_successor( receiver<output_type> &r ) {
1561             r.remove_predecessor(*this);
1562             buffer_operation op_data(rem_succ);
1563             op_data.r = &r;
1564             my_aggregator.execute(&op_data);
1565             return true;
1566         }
1567
1568         //! Request an item from the buffer_node
1569         /**  true = v contains the returned item<BR>
1570              false = no item has been returned */
1571         /* override */ bool try_get( T &v ) {
1572             buffer_operation op_data(req_item);
1573             op_data.elem = &v;
1574             my_aggregator.execute(&op_data);
1575             return (op_data.status==SUCCEEDED);
1576         }
1577
1578         //! Reserves an item.
1579         /**  false = no item can be reserved<BR>
1580              true = an item is reserved */
1581         /* override */ bool try_reserve( T &v ) {
1582             buffer_operation op_data(res_item);
1583             op_data.elem = &v;
1584             my_aggregator.execute(&op_data);
1585             return (op_data.status==SUCCEEDED);
1586         }
1587
1588         //! Release a reserved item.
1589         /**  true = item has been released and so remains in sender */
1590         /* override */ bool try_release() {
1591             buffer_operation op_data(rel_res);
1592             my_aggregator.execute(&op_data);
1593             return true;
1594         }
1595
1596         //! Consumes a reserved item.
1597         /** true = item is removed from sender and reservation removed */
1598         /* override */ bool try_consume() {
1599             buffer_operation op_data(con_res);
1600             my_aggregator.execute(&op_data);
1601             return true;
1602         }
1603
1604         //! Receive an item
1605         /** true is always returned */
1606         /* override */ bool try_put(T t) {
1607             buffer_operation op_data(t, put_item);
1608             my_aggregator.execute(&op_data);
1609             return true;
1610         }
1611     };
1612
1613
1614     //! Forwards messages in FIFO order
1615     template <typename T, typename A=cache_aligned_allocator<T> >
1616     class queue_node : public buffer_node<T, A> {
1617     protected:
1618     typedef typename buffer_node<T, A>::size_type size_type;
1619     typedef typename buffer_node<T, A>::buffer_operation queue_operation;
1620
1621         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1622
1623         //! Tries to forward valid items to successors
1624         /* override */ void internal_forward(queue_operation *op) {
1625             T i_copy;
1626             bool success = false; // flagged when a successor accepts
1627             size_type counter = this->my_successors.size();
1628             if (this->my_reserved || !this->item_valid(this->my_head)){
1629                 __TBB_store_with_release(op->status, FAILED);
1630                 this->forwarder_busy = false;
1631                 return;
1632             }
1633             // Keep trying to send items while there is at least one accepting successor
1634             while (counter>0 && this->item_valid(this->my_head)) {
1635                 this->fetch_front(i_copy);
1636                 if(this->my_successors.try_put(i_copy)) {
1637                      this->invalidate_front();
1638                      ++(this->my_head);
1639                     success = true; // found an accepting successor
1640                 }
1641                 --counter;
1642             }
1643             if (success && !counter)
1644                 __TBB_store_with_release(op->status, SUCCEEDED);
1645             else {
1646                 __TBB_store_with_release(op->status, FAILED);
1647                 this->forwarder_busy = false;
1648             }
1649         }
1650
1651         /* override */ void internal_pop(queue_operation *op) {
1652             if ( this->my_reserved || !this->item_valid(this->my_head)){
1653                 __TBB_store_with_release(op->status, FAILED);
1654             }
1655             else {
1656                 this->pop_front(*(op->elem));
1657                 __TBB_store_with_release(op->status, SUCCEEDED);
1658             }
1659         }
1660         /* override */ void internal_reserve(queue_operation *op) {
1661             if (this->my_reserved || !this->item_valid(this->my_head)) {
1662                 __TBB_store_with_release(op->status, FAILED);
1663             }
1664             else {
1665                 this->my_reserved = true;
1666                 this->fetch_front(*(op->elem));
1667                 this->invalidate_front();
1668                 __TBB_store_with_release(op->status, SUCCEEDED);
1669             }
1670         }
1671         /* override */ void internal_consume(queue_operation *op) {
1672             this->consume_front();
1673             __TBB_store_with_release(op->status, SUCCEEDED);
1674         }
1675
1676     public:
1677
1678         typedef T input_type;
1679         typedef T output_type;
1680         typedef sender< input_type > predecessor_type;
1681         typedef receiver< output_type > successor_type;
1682
1683         //! Constructor
1684     queue_node( graph &g ) : buffer_node<T, A>(g) {}
1685     };
1686
1687     //! Forwards messages in sequence order
1688     template< typename T, typename A=cache_aligned_allocator<T> >
1689     class sequencer_node : public queue_node<T, A> {
1690         internal::function_body< T, size_t > *my_sequencer;
1691     public:
1692
1693         typedef T input_type;
1694         typedef T output_type;
1695         typedef sender< input_type > predecessor_type;
1696         typedef receiver< output_type > successor_type;
1697
1698         //! Constructor
1699         template< typename Sequencer >
1700         sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
1701             my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
1702
1703         //! Destructor
1704         ~sequencer_node() { delete my_sequencer; }
1705     protected:
1706         typedef typename buffer_node<T, A>::size_type size_type;
1707         typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
1708
1709         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1710
1711     private:
1712         /* override */ void internal_push(sequencer_operation *op) {
1713             size_type tag = (*my_sequencer)(*(op->elem));
1714
1715             this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1716
1717             if(this->size() > this->capacity())
1718                 this->grow_my_array(this->size());  // tail already has 1 added to it
1719             this->item(tag) = std::make_pair( *(op->elem), true );
1720             __TBB_store_with_release(op->status, SUCCEEDED);
1721         }
1722     };
1723
1724     //! Forwards messages in priority order
1725     template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
1726     class priority_queue_node : public buffer_node<T, A> {
1727     public:
1728         typedef T input_type;
1729         typedef T output_type;
1730         typedef sender< input_type > predecessor_type;
1731         typedef receiver< output_type > successor_type;
1732
1733         //! Constructor
1734     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
1735
1736     protected:
1737         typedef typename buffer_node<T, A>::size_type size_type;
1738         typedef typename buffer_node<T, A>::item_type item_type;
1739         typedef typename buffer_node<T, A>::buffer_operation prio_operation;
1740
1741         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1742
1743         /* override */ void handle_operations(prio_operation *op_list) {
1744             prio_operation *tmp /*, *pop_list*/ ;
1745             bool try_forwarding=false;
1746             while (op_list) {
1747                 tmp = op_list;
1748                 op_list = op_list->next;
1749                 switch (tmp->type) {
1750                 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
1751                 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
1752                 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
1753                 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
1754                 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
1755                 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
1756                 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
1757                 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
1758                 }
1759             }
1760             // process pops!  for now, no special pop processing
1761             if (mark<this->my_tail) heapify();
1762             if (try_forwarding && !this->forwarder_busy) {
1763                 this->forwarder_busy = true;
1764                 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
1765             }
1766         }
1767
1768         //! Tries to forward valid items to successors
1769         /* override */ void internal_forward(prio_operation *op) {
1770             T i_copy;
1771             bool success = false; // flagged when a successor accepts
1772             size_type counter = this->my_successors.size();
1773
1774             if (this->my_reserved || this->my_tail == 0) {
1775                 __TBB_store_with_release(op->status, FAILED);
1776                 this->forwarder_busy = false;
1777                 return;
1778             }
1779             // Keep trying to send while there exists an accepting successor
1780             while (counter>0 && this->my_tail > 0) {
1781                 i_copy = this->my_array[0].first;
1782                 bool msg = this->my_successors.try_put(i_copy);
1783                 if ( msg == true ) {
1784                      if (mark == this->my_tail) --mark;
1785                     --(this->my_tail);
1786                     this->my_array[0].first=this->my_array[this->my_tail].first;
1787                     if (this->my_tail > 1) // don't reheap for heap of size 1
1788                         reheap();
1789                     success = true; // found an accepting successor
1790                 }
1791                 --counter;
1792             }
1793             if (success && !counter)
1794                 __TBB_store_with_release(op->status, SUCCEEDED);
1795             else {
1796                 __TBB_store_with_release(op->status, FAILED);
1797                 this->forwarder_busy = false;
1798             }
1799         }
1800
1801         /* override */ void internal_push(prio_operation *op) {
1802             if ( this->my_tail >= this->my_array_size )
1803                 this->grow_my_array( this->my_tail + 1 );
1804             this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
1805             ++(this->my_tail);
1806             __TBB_store_with_release(op->status, SUCCEEDED);
1807         }
1808         /* override */ void internal_pop(prio_operation *op) {
1809             if ( this->my_reserved == true || this->my_tail == 0 ) {
1810                 __TBB_store_with_release(op->status, FAILED);
1811             }
1812             else {
1813                 if (mark<this->my_tail &&
1814                     compare(this->my_array[0].first,
1815                             this->my_array[this->my_tail-1].first)) {
1816                     // there are newly pushed elems; last one higher than top
1817                     // copy the data
1818                     *(op->elem) = this->my_array[this->my_tail-1].first;
1819                     --(this->my_tail);
1820                     __TBB_store_with_release(op->status, SUCCEEDED);
1821                 }
1822                 else { // extract and push the last element down heap
1823                     *(op->elem) = this->my_array[0].first; // copy the data
1824                     if (mark == this->my_tail) --mark;
1825                     --(this->my_tail);
1826                     __TBB_store_with_release(op->status, SUCCEEDED);
1827                     this->my_array[0].first=this->my_array[this->my_tail].first;
1828                     if (this->my_tail > 1) // don't reheap for heap of size 1
1829                         reheap();
1830                 }
1831             }
1832         }
1833         /* override */ void internal_reserve(prio_operation *op) {
1834             if (this->my_reserved == true || this->my_tail == 0) {
1835                 __TBB_store_with_release(op->status, FAILED);
1836             }
1837             else {
1838                 this->my_reserved = true;
1839                 *(op->elem) = reserved_item = this->my_array[0].first;
1840                 if (mark == this->my_tail) --mark;
1841                 --(this->my_tail);
1842                 __TBB_store_with_release(op->status, SUCCEEDED);
1843                 this->my_array[0].first = this->my_array[this->my_tail].first;
1844                 if (this->my_tail > 1) // don't reheap for heap of size 1
1845                     reheap();
1846             }
1847         }
1848         /* override */ void internal_consume(prio_operation *op) {
1849             this->my_reserved = false;
1850             __TBB_store_with_release(op->status, SUCCEEDED);
1851         }
1852         /* override */ void internal_release(prio_operation *op) {
1853             if (this->my_tail >= this->my_array_size)
1854                 this->grow_my_array( this->my_tail + 1 );
1855             this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
1856             ++(this->my_tail);
1857             this->my_reserved = false;
1858             __TBB_store_with_release(op->status, SUCCEEDED);
1859             heapify();
1860         }
1861     private:
1862         Compare compare;
1863         size_type mark;
1864         input_type reserved_item;
1865
1866         void heapify() {
1867             if (!mark) mark = 1;
1868             for (; mark<this->my_tail; ++mark) { // for each unheaped element
1869                 size_type cur_pos = mark;
1870                 input_type to_place = this->my_array[mark].first;
1871                 do { // push to_place up the heap
1872                     size_type parent = (cur_pos-1)>>1;
1873                     if (!compare(this->my_array[parent].first, to_place))
1874                         break;
1875                     this->my_array[cur_pos].first = this->my_array[parent].first;
1876                     cur_pos = parent;
1877                 } while( cur_pos );
1878                 this->my_array[cur_pos].first = to_place;
1879             }
1880         }
1881
1882         void reheap() {
1883             size_type cur_pos=0, child=1;
1884             while (child < mark) {
1885                 size_type target = child;
1886                 if (child+1<mark &&
1887                     compare(this->my_array[child].first,
1888                             this->my_array[child+1].first))
1889                     ++target;
1890                 // target now has the higher priority child
1891                 if (compare(this->my_array[target].first,
1892                             this->my_array[this->my_tail].first))
1893                     break;
1894                 this->my_array[cur_pos].first = this->my_array[target].first;
1895                 cur_pos = target;
1896                 child = (cur_pos<<1)+1;
1897             }
1898             this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
1899         }
1900     };
1901
1902     //! Forwards messages only if the threshold has not been reached
1903     /** This node forwards items until its threshold is reached.
1904         It contains no buffering.  If the downstream node rejects, the
1905         message is dropped. */
1906     template< typename T >
1907     class limiter_node : public graph_node, public receiver< T >, public sender< T >, internal::no_copy {
1908     public:
1909
1910         typedef T input_type;
1911         typedef T output_type;
1912         typedef sender< input_type > predecessor_type;
1913         typedef receiver< output_type > successor_type;
1914
1915     private:
1916
1917         task *my_root_task;
1918         size_t my_threshold;
1919         size_t my_count;
1920         internal::predecessor_cache< T > my_predecessors;
1921         spin_mutex my_mutex;
1922         internal::broadcast_cache< T > my_successors;
1923
1924         friend class internal::forward_task< limiter_node<T> >;
1925
1926         // Let decrementer call decrement_counter()
1927         friend class internal::decrementer< limiter_node<T> >;
1928
1929         void decrement_counter() {
1930             input_type v;
1931             
1932             // If we can't get / put an item immediately then drop the count
1933             if ( my_predecessors.get_item( v ) == false 
1934                  || my_successors.try_put(v) == false ) {
1935                 spin_mutex::scoped_lock lock(my_mutex);
1936                 --my_count;
1937                 if ( !my_predecessors.empty() ) 
1938                     task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
1939                                 internal::forward_task< limiter_node<T> >( *this ) );
1940             }
1941         }
1942
1943         void forward() {
1944             {
1945                 spin_mutex::scoped_lock lock(my_mutex);
1946                 if ( my_count < my_threshold ) 
1947                     ++my_count;
1948                 else
1949                     return;
1950             }
1951             decrement_counter();
1952         }
1953
1954     public:
1955
1956         //! The internal receiver< continue_msg > that decrements the count
1957         internal::decrementer< limiter_node<T> > decrement;
1958
1959         //! Constructor
1960         limiter_node( graph &g, size_t threshold, int number_of_decrement_predecessors = 0 ) : 
1961            my_root_task(g.root_task()), my_threshold(threshold), my_count(0), decrement(number_of_decrement_predecessors) {
1962             my_predecessors.set_owner(this);
1963             my_successors.set_owner(this);
1964             decrement.set_owner(this);
1965         }
1966
1967         //! Replace the current successor with this new successor
1968         /* override */ bool register_successor( receiver<output_type> &r ) {
1969             my_successors.register_successor(r);
1970             return true;
1971         }
1972
1973         //! Removes a successor from this node
1974         /** r.remove_predecessor(*this) is also called. */
1975         /* override */ bool remove_successor( receiver<output_type> &r ) {
1976             r.remove_predecessor(*this);
1977             my_successors.remove_successor(r);
1978             return true;
1979         }
1980
1981         //! Puts an item to this receiver
1982         /* override */ bool try_put( T t ) {
1983             {
1984                 spin_mutex::scoped_lock lock(my_mutex);
1985                 if ( my_count >= my_threshold ) 
1986                     return false;
1987                 else
1988                     ++my_count; 
1989             }
1990
1991             bool msg = my_successors.try_put(t);
1992
1993             if ( msg != true ) {
1994                 spin_mutex::scoped_lock lock(my_mutex);
1995                 --my_count;
1996                 if ( !my_predecessors.empty() ) 
1997                     task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
1998                                 internal::forward_task< limiter_node<T> >( *this ) );
1999             }
2000
2001             return msg;
2002         }
2003
2004         //! Removes src from the list of cached predecessors.
2005         /* override */ bool register_predecessor( predecessor_type &src ) {
2006             spin_mutex::scoped_lock lock(my_mutex);
2007             my_predecessors.add( src );
2008             if ( my_count < my_threshold && !my_successors.empty() ) 
2009                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
2010                                internal::forward_task< limiter_node<T> >( *this ) );
2011             return true;
2012         }
2013
2014         //! Removes src from the list of cached predecessors.
2015         /* override */ bool remove_predecessor( predecessor_type &src ) {
2016             my_predecessors.remove( src );
2017             return true;
2018         }
2019
2020     };
2021
2022     namespace internal {
2023
2024     struct forwarding_base {
2025         forwarding_base(task *rt) : my_root_task(rt) {}
2026         virtual ~forwarding_base() {}
2027         virtual void decrement_port_count() = 0;
2028         virtual void increment_port_count() = 0;
2029         // moved here so input ports can queue tasks
2030         task* my_root_task;
2031     };
2032
2033     template< int N >
2034     struct join_helper {
2035
2036         template< typename TupleType, typename PortType >
2037         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
2038             std::get<N-1>( my_input ).set_join_node_pointer(port);
2039             join_helper<N-1>::set_join_node_pointer( my_input, port );
2040         }
2041         template< typename TupleType >
2042         static inline void consume_reservations( TupleType &my_input ) {
2043             std::get<N-1>( my_input ).consume();
2044             join_helper<N-1>::consume_reservations( my_input );
2045         }
2046
2047         template< typename TupleType >
2048         static inline void release_my_reservation( TupleType &my_input ) {
2049             std::get<N-1>( my_input ).release();
2050         }
2051
2052         template <typename TupleType>
2053         static inline void release_reservations( TupleType &my_input) {
2054             join_helper<N-1>::release_reservations(my_input);
2055             release_my_reservation(my_input);
2056         }
2057
2058         template< typename InputTuple, typename OutputTuple >
2059         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
2060             if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
2061             if ( !join_helper<N-1>::reserve( my_input, out ) ) {
2062                 release_my_reservation( my_input );
2063                 return false;
2064             }
2065             return true;
2066         }
2067
2068         template<typename InputTuple, typename OutputTuple>
2069         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
2070             bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail
2071             return join_helper<N-1>::get_my_item(my_input, out) && res;       // do get on other inputs before returning
2072         }
2073
2074         template<typename InputTuple, typename OutputTuple>
2075         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
2076             return get_my_item(my_input, out);
2077         }
2078
2079         template<typename InputTuple>
2080         static inline void reset_my_port(InputTuple &my_input) {
2081             join_helper<N-1>::reset_my_port(my_input);
2082             std::get<N-1>(my_input).reset_port();
2083         }
2084
2085         template<typename InputTuple>
2086         static inline void reset_ports(InputTuple& my_input) {
2087             reset_my_port(my_input);
2088         }
2089     };
2090
2091     template< >
2092     struct join_helper<1> {
2093
2094         template< typename TupleType, typename PortType >
2095         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
2096             std::get<0>( my_input ).set_join_node_pointer(port);
2097         }
2098
2099         template< typename TupleType >
2100         static inline void consume_reservations( TupleType &my_input ) {
2101             std::get<0>( my_input ).consume();
2102         }
2103
2104         template< typename TupleType >
2105         static inline void release_my_reservation( TupleType &my_input ) {
2106             std::get<0>( my_input ).release();
2107         }
2108         
2109         template<typename TupleType>
2110         static inline void release_reservations( TupleType &my_input) {
2111             release_my_reservation(my_input);
2112         }
2113
2114         template< typename InputTuple, typename OutputTuple >
2115         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
2116             return std::get<0>( my_input ).reserve( std::get<0>( out ) );
2117         }
2118
2119         template<typename InputTuple, typename OutputTuple>
2120         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
2121             return std::get<0>(my_input).get_item(std::get<0>(out));
2122         }
2123
2124         template<typename InputTuple, typename OutputTuple>
2125         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
2126             return get_my_item(my_input, out);
2127         }
2128
2129         template<typename InputTuple>
2130         static inline void reset_my_port(InputTuple &my_input) {
2131             std::get<0>(my_input).reset_port();
2132         }
2133
2134         template<typename InputTuple>
2135         static inline void reset_ports(InputTuple& my_input) {
2136             reset_my_port(my_input);
2137         }
2138     };
2139
2140     namespace join_policy_namespace {
2141         enum join_policy { reserving
2142             , queueing
2143         };
2144     }
2145     using namespace join_policy_namespace;
2146
2147     //! The two-phase join port
2148     template< typename T >
2149     class reserving_port : public receiver<T> {
2150     public:
2151         typedef T input_type;
2152         typedef sender<T> predecessor_type;
2153     private:
2154         // ----------- Aggregator ------------
2155         enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res };
2156         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2157         typedef reserving_port<T> my_class;
2158
2159         class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
2160         public:
2161             char type;
2162             union {
2163                 T *my_arg;
2164                 predecessor_type *my_pred;
2165             };
2166             reserving_port_operation(const T& e, op_type t) :
2167                 type(char(t)), my_arg(const_cast<T*>(&e)) {}
2168             reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)), 
2169                 my_pred(const_cast<predecessor_type *>(&s)) {}
2170             reserving_port_operation(op_type t) : type(char(t)) {}
2171         };
2172
2173         typedef internal::aggregating_functor<my_class, reserving_port_operation> my_handler;
2174         friend class internal::aggregating_functor<my_class, reserving_port_operation>;
2175         aggregator<my_handler, reserving_port_operation> my_aggregator;
2176
2177         void handle_operations(reserving_port_operation* op_list) {
2178             reserving_port_operation *current;
2179             bool no_predecessors;
2180             while(op_list) {
2181                 current = op_list;
2182                 op_list = op_list->next;
2183                 switch(current->type) {
2184                 case reg_pred:
2185                     no_predecessors = my_predecessors.empty();
2186                     my_predecessors.add(*(current->my_pred));
2187                     if ( no_predecessors ) {
2188                         my_join->decrement_port_count( ); // may try to forward
2189                     }
2190                     __TBB_store_with_release(current->status, SUCCEEDED);
2191                     break;
2192                 case rem_pred:
2193                     my_predecessors.remove(*(current->my_pred));
2194                     if(my_predecessors.empty()) my_join->increment_port_count();
2195                     __TBB_store_with_release(current->status, SUCCEEDED);
2196                     break;
2197                 case res_item:
2198                     if ( reserved ) {
2199                         __TBB_store_with_release(current->status, FAILED);
2200                     }
2201                     else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
2202                         reserved = true;
2203                         __TBB_store_with_release(current->status, SUCCEEDED);
2204                     } else {
2205                         if ( my_predecessors.empty() ) {
2206                             my_join->increment_port_count();
2207                         }
2208                         __TBB_store_with_release(current->status, FAILED);
2209                     }
2210                     break;
2211                 case rel_res:
2212                     reserved = false;
2213                     my_predecessors.try_release( );
2214                     __TBB_store_with_release(current->status, SUCCEEDED);
2215                     break;
2216                 case con_res:
2217                     reserved = false;
2218                     my_predecessors.try_consume( );
2219                     __TBB_store_with_release(current->status, SUCCEEDED);
2220                     break;
2221                 }
2222             }
2223         }
2224
2225     public:
2226
2227         //! Constructor
2228         reserving_port() : reserved(false) {
2229             my_join = NULL;
2230             my_predecessors.set_owner( this );
2231             my_aggregator.initialize_handler(my_handler(this));
2232         }
2233
2234         // copy constructor
2235         reserving_port(const reserving_port& /* other */) : receiver<T>() {
2236             reserved = false;
2237             my_join = NULL;
2238             my_predecessors.set_owner( this );
2239             my_aggregator.initialize_handler(my_handler(this));
2240         }
2241
2242         void set_join_node_pointer(forwarding_base *join) {
2243             my_join = join;
2244         }
2245
2246         // always rejects, so arc is reversed (and reserves can be done.)
2247         bool try_put( T ) {
2248             return false;
2249         }
2250
2251         //! Add a predecessor
2252         bool register_predecessor( sender<T> &src ) {
2253             reserving_port_operation op_data(src, reg_pred);
2254             my_aggregator.execute(&op_data);
2255             return op_data.status == SUCCEEDED;
2256         }
2257
2258         //! Remove a predecessor
2259         bool remove_predecessor( sender<T> &src ) {
2260             reserving_port_operation op_data(src, rem_pred);
2261             my_aggregator.execute(&op_data);
2262             return op_data.status == SUCCEEDED;
2263         }
2264
2265         //! Reserve an item from the port
2266         bool reserve( T &v ) {
2267             reserving_port_operation op_data(v, res_item);
2268             my_aggregator.execute(&op_data);
2269             return op_data.status == SUCCEEDED;
2270         }
2271
2272         //! Release the port
2273         void release( ) {
2274             reserving_port_operation op_data(rel_res);
2275             my_aggregator.execute(&op_data);
2276         }
2277
2278         //! Complete use of the port
2279         void consume( ) {
2280             reserving_port_operation op_data(con_res);
2281             my_aggregator.execute(&op_data);
2282         }
2283
2284     private:
2285         forwarding_base *my_join;
2286         reservable_predecessor_cache< T, null_mutex > my_predecessors;
2287         bool reserved;
2288     };
2289
2290     //! queueing join_port
2291     template<typename T>
2292     class queueing_port : public receiver<T>, public item_buffer<T> {
2293     public:
2294         typedef T input_type;
2295         typedef sender<T> predecessor_type;
2296         typedef queueing_port<T> my_node_type;
2297
2298 // ----------- Aggregator ------------
2299     private:
2300         enum op_type { try__put, get__item, res_port };
2301         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2302         typedef queueing_port<T> my_class;
2303
2304         class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
2305         public:
2306             char type;
2307             union {
2308                 T my_val;
2309                 T *my_arg;
2310             };
2311             // constructor for value parameter
2312             queueing_port_operation(const T& e, op_type t) :
2313                 // type(char(t)), my_val(const_cast<T>(e)) {}
2314                 type(char(t)), my_val(e) {}
2315             // constructor for pointer parameter
2316             queueing_port_operation(const T* p, op_type t) :
2317                 type(char(t)), my_arg(const_cast<T*>(p)) {}
2318             // constructor with no parameter
2319             queueing_port_operation(op_type t) : type(char(t)) {}
2320         };
2321
2322         typedef internal::aggregating_functor<my_class, queueing_port_operation> my_handler;
2323         friend class internal::aggregating_functor<my_class, queueing_port_operation>;
2324         aggregator<my_handler, queueing_port_operation> my_aggregator;
2325
2326         void handle_operations(queueing_port_operation* op_list) {
2327             queueing_port_operation *current;
2328             bool was_empty;
2329             while(op_list) {
2330                 current = op_list;
2331                 op_list = op_list->next;
2332                 switch(current->type) {
2333                 case try__put:
2334                     was_empty = this->buffer_empty();
2335                     this->push_back(current->my_val);
2336                     if (was_empty) my_join->decrement_port_count();
2337                     __TBB_store_with_release(current->status, SUCCEEDED);
2338                     break;
2339                 case get__item:
2340                     if(!this->buffer_empty()) {
2341                         this->fetch_front(*(current->my_arg));
2342                         __TBB_store_with_release(current->status, SUCCEEDED);
2343                     }
2344                     else {
2345                         __TBB_store_with_release(current->status, FAILED);
2346                     }
2347                     break;
2348                 case res_port:
2349                     __TBB_ASSERT(this->item_valid(this->my_head), "No item to reset");
2350                     this->invalidate_front(); ++(this->my_head);
2351                     if(this->item_valid(this->my_head)) {
2352                         my_join->decrement_port_count();
2353                     }
2354                     __TBB_store_with_release(current->status, SUCCEEDED);
2355                     break;
2356                 }
2357             }
2358         }
2359 // ------------ End Aggregator ---------------
2360     public:
2361
2362         //! Constructor
2363         queueing_port() : item_buffer<T>() {
2364             my_join = NULL;
2365             my_aggregator.initialize_handler(my_handler(this));
2366         }
2367
2368         //! copy constructor
2369         queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
2370             my_join = NULL;
2371             my_aggregator.initialize_handler(my_handler(this));
2372         }
2373
2374         //! record parent for tallying available items
2375         void set_join_node_pointer(forwarding_base *join) {
2376             my_join = join;
2377         }
2378
2379         /*override*/bool try_put(T v) {
2380             queueing_port_operation op_data(v, try__put);
2381             my_aggregator.execute(&op_data);
2382             return op_data.status == SUCCEEDED;
2383         }
2384
2385
2386         bool get_item( T &v ) {
2387             queueing_port_operation op_data(&v, get__item);
2388             my_aggregator.execute(&op_data);
2389             return op_data.status == SUCCEEDED;
2390         }
2391
2392         // reset_port is called when item is accepted by successor, but
2393         // is initiated by join_node.
2394         void reset_port() {
2395             queueing_port_operation op_data(res_port);
2396             my_aggregator.execute(&op_data);
2397             return;
2398         }
2399
2400     private:
2401         forwarding_base *my_join;
2402     };
2403
2404     template<join_policy JP, typename InputTuple, typename OutputTuple>
2405     class join_node_base;
2406
2407     //! join_node_FE : implements input port policy
2408     template<join_policy JP, typename InputTuple, typename OutputTuple>
2409     class join_node_FE;
2410
2411     template<typename InputTuple, typename OutputTuple>
2412     class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
2413     public:
2414         static const int N = std::tuple_size<OutputTuple>::value;
2415         typedef OutputTuple output_type;
2416         typedef InputTuple input_type;
2417         typedef join_node_base<reserving, InputTuple, OutputTuple> my_node_type; // for forwarding
2418
2419         join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NULL) {
2420             ports_with_no_inputs = N;
2421             join_helper<N>::set_join_node_pointer(my_inputs, this);
2422         }
2423
2424         void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
2425
2426        void increment_port_count() {
2427             ++ports_with_no_inputs;
2428         }
2429
2430         // if all input_ports have predecessors, spawn forward to try and consume tuples
2431         void decrement_port_count() {
2432             if(ports_with_no_inputs.fetch_and_decrement() == 1) {
2433                 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
2434                     forward_task<my_node_type>(*my_node) );
2435             }
2436         }
2437
2438         input_type &inputs() { return my_inputs; }
2439     protected:
2440         // all methods on input ports should be called under mutual exclusion from join_node_base.
2441
2442         bool tuple_build_may_succeed() {
2443             return !ports_with_no_inputs;
2444         }
2445
2446         bool try_to_make_tuple(output_type &out) {
2447             if(ports_with_no_inputs) return false;
2448             return join_helper<N>::reserve(my_inputs, out);
2449         }
2450
2451         void tuple_accepted() {
2452             join_helper<N>::consume_reservations(my_inputs);
2453         }
2454         void tuple_rejected() {
2455             join_helper<N>::release_reservations(my_inputs);
2456         }
2457
2458         input_type my_inputs;
2459         my_node_type *my_node;
2460         atomic<size_t> ports_with_no_inputs;
2461     };
2462
2463     template<typename InputTuple, typename OutputTuple>
2464     class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
2465     public:
2466         static const int N = std::tuple_size<OutputTuple>::value;
2467         typedef OutputTuple output_type;
2468         typedef InputTuple input_type;
2469         typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_type; // for forwarding
2470
2471         join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NULL) {
2472             ports_with_no_items = N;
2473             join_helper<N>::set_join_node_pointer(my_inputs, this);
2474         }
2475
2476         // needed for forwarding
2477         void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
2478
2479         void reset_port_count() {
2480             ports_with_no_items = N;
2481         }
2482
2483         // if all input_ports have items, spawn forward to try and consume tuples
2484         void decrement_port_count() {
2485             if(ports_with_no_items.fetch_and_decrement() == 1) {
2486                 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
2487                     forward_task<my_node_type>(*my_node) );
2488             }
2489         }
2490
2491         void increment_port_count() { __TBB_ASSERT(false, NULL); }  // should never be called
2492
2493         input_type &inputs() { return my_inputs; }
2494     protected:
2495         // all methods on input ports should be called under mutual exclusion from join_node_base.
2496
2497         bool tuple_build_may_succeed() {
2498             return !ports_with_no_items;
2499         }
2500
2501         bool try_to_make_tuple(output_type &out) {
2502             if(ports_with_no_items) return false;
2503             return join_helper<N>::get_items(my_inputs, out);
2504         }
2505
2506         void tuple_accepted() {
2507             reset_port_count();
2508             join_helper<N>::reset_ports(my_inputs);
2509         }
2510         void tuple_rejected() {
2511             // nothing to do.
2512         }
2513
2514         input_type my_inputs;
2515         my_node_type *my_node;
2516         atomic<size_t> ports_with_no_items;
2517     };
2518
2519     //! join_node_base
2520     template<join_policy JP, typename InputTuple, typename OutputTuple>
2521     class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
2522                            public sender<OutputTuple>, no_copy {
2523     public:
2524         typedef OutputTuple output_type;
2525
2526         typedef receiver<output_type> successor_type;
2527         typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
2528         using input_ports_type::tuple_build_may_succeed;
2529         using input_ports_type::try_to_make_tuple;
2530         using input_ports_type::tuple_accepted;
2531         using input_ports_type::tuple_rejected;
2532
2533     private:
2534         // ----------- Aggregator ------------
2535         enum op_type { reg_succ, rem_succ, try__get, do_fwrd };
2536         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2537         typedef join_node_base<JP,InputTuple,OutputTuple> my_class;
2538
2539         class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
2540         public:
2541             char type;
2542             union {
2543                 output_type *my_arg;
2544                 successor_type *my_succ;
2545             };
2546             join_node_base_operation(const output_type& e, op_type t) :
2547                 type(char(t)), my_arg(const_cast<output_type*>(&e)) {}
2548             join_node_base_operation(const successor_type &s, op_type t) : type(char(t)), 
2549                 my_succ(const_cast<successor_type *>(&s)) {}
2550             join_node_base_operation(op_type t) : type(char(t)) {}
2551         };
2552
2553         typedef internal::aggregating_functor<my_class, join_node_base_operation> my_handler;
2554         friend class internal::aggregating_functor<my_class, join_node_base_operation>;
2555         bool forwarder_busy;
2556         aggregator<my_handler, join_node_base_operation> my_aggregator;
2557
2558         void handle_operations(join_node_base_operation* op_list) {
2559             join_node_base_operation *current;
2560             while(op_list) {
2561                 current = op_list;
2562                 op_list = op_list->next;
2563                 switch(current->type) {
2564                 case reg_succ:
2565                     my_successors.register_successor(*(current->my_succ));
2566                     if(tuple_build_may_succeed() && !forwarder_busy) {
2567                         task::enqueue( * new ( task::allocate_additional_child_of(*(this->my_root_task)) )
2568                                 forward_task<join_node_base<JP,InputTuple,OutputTuple> >(*this));
2569                         forwarder_busy = true;
2570                     }
2571                     __TBB_store_with_release(current->status, SUCCEEDED);
2572                     break;
2573                 case rem_succ:
2574                     my_successors.remove_successor(*(current->my_succ));
2575                     __TBB_store_with_release(current->status, SUCCEEDED);
2576                     break;
2577                 case try__get:
2578                     if(tuple_build_may_succeed()) {
2579                         if(try_to_make_tuple(*(current->my_arg))) {
2580                             tuple_accepted();
2581                             __TBB_store_with_release(current->status, SUCCEEDED);
2582                         }
2583                         else __TBB_store_with_release(current->status, FAILED);
2584                     }
2585                     else __TBB_store_with_release(current->status, FAILED);
2586                     break;
2587                 case do_fwrd: {
2588                         bool build_succeeded;
2589                         output_type out;
2590                         if(tuple_build_may_succeed()) {
2591                             do {
2592                                 build_succeeded = try_to_make_tuple(out);
2593                                 if(build_succeeded) {
2594                                     if(my_successors.try_put(out)) {
2595                                         tuple_accepted();
2596                                     }
2597                                     else {
2598                                         tuple_rejected();
2599                                         build_succeeded = false;
2600                                     }
2601                                 }
2602                             } while(build_succeeded);
2603                         }
2604                         __TBB_store_with_release(current->status, SUCCEEDED);
2605                         forwarder_busy = false;
2606                     }
2607                     break;
2608                 }
2609             }
2610         }
2611         // ---------- end aggregator -----------
2612     public:
2613
2614         join_node_base(graph &g) : input_ports_type(g), forwarder_busy(false) {
2615             my_successors.set_owner(this);
2616             input_ports_type::set_my_node(this);
2617             my_aggregator.initialize_handler(my_handler(this));
2618         }
2619
2620         bool register_successor(successor_type &r) {
2621             join_node_base_operation op_data(r, reg_succ);
2622             my_aggregator.execute(&op_data);
2623             return op_data.status == SUCCEEDED;
2624         }
2625
2626         bool remove_successor( successor_type &r) {
2627             join_node_base_operation op_data(r, rem_succ);
2628             my_aggregator.execute(&op_data);
2629             return op_data.status == SUCCEEDED;
2630         }
2631
2632         bool try_get( output_type &v) {
2633             join_node_base_operation op_data(v, try__get);
2634             my_aggregator.execute(&op_data);
2635             return op_data.status == SUCCEEDED;
2636         }
2637
2638     private:
2639         broadcast_cache<output_type, null_rw_mutex> my_successors;
2640
2641         friend class forward_task< join_node_base<JP, InputTuple, OutputTuple> >;
2642
2643         void forward() {
2644             join_node_base_operation op_data(do_fwrd);
2645             my_aggregator.execute(&op_data);
2646         }
2647     };
2648
2649     //! unfolded_join_node : passes input_ports_tuple_type to join_node_base.  We build the input port type
2650     //  using tuple_element.
2651     template<int N, typename OutputTuple, join_policy JP>
2652     class unfolded_join_node;
2653
2654     template<typename OutputTuple>
2655     class unfolded_join_node<2,OutputTuple,reserving> : public internal::join_node_base<reserving,
2656         std::tuple<
2657                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2658                 reserving_port<typename std::tuple_element<1,OutputTuple>::type> >,
2659         OutputTuple
2660                   >
2661                   {
2662     public:
2663         typedef typename std::tuple<
2664                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2665                 reserving_port<typename std::tuple_element<1,OutputTuple>::type> > input_ports_tuple_type;
2666         typedef OutputTuple output_type;
2667     private:
2668         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2669     public:
2670         unfolded_join_node(graph &g) : base_type(g) {}
2671     };
2672
2673     template<typename OutputTuple>
2674     class unfolded_join_node<3,OutputTuple,reserving> : public internal::join_node_base<reserving,
2675         std::tuple<
2676                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2677                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2678                 reserving_port<typename std::tuple_element<2,OutputTuple>::type> >,
2679         OutputTuple
2680                     >
2681                     {
2682     public:
2683         typedef typename std::tuple<
2684                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2685                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2686                 reserving_port<typename std::tuple_element<2,OutputTuple>::type> > input_ports_tuple_type;
2687         typedef OutputTuple output_type;
2688     private:
2689         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2690     public:
2691         unfolded_join_node(graph &g) : base_type(g) {}
2692     };
2693
2694     template<typename OutputTuple>
2695     class unfolded_join_node<4,OutputTuple,reserving> : public internal::join_node_base<reserving,
2696         std::tuple<
2697                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2698                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2699                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2700                 reserving_port<typename std::tuple_element<3,OutputTuple>::type> >,
2701         OutputTuple
2702                     > {
2703     public:
2704         typedef typename std::tuple<
2705                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2706                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2707                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>, 
2708                 reserving_port<typename std::tuple_element<3,OutputTuple>::type> > input_ports_tuple_type;
2709         typedef OutputTuple output_type;
2710     private:
2711         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2712     public:
2713         unfolded_join_node(graph &g) : base_type(g) {}
2714     };
2715
2716     template<typename OutputTuple>
2717     class unfolded_join_node<5,OutputTuple,reserving> : public internal::join_node_base<reserving,
2718         std::tuple<
2719                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2720                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2721                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2722                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2723                 reserving_port<typename std::tuple_element<4,OutputTuple>::type> >,
2724         OutputTuple
2725                 > {
2726     public:
2727         typedef typename std::tuple<
2728                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2729                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2730                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>, 
2731                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>, 
2732                 reserving_port<typename std::tuple_element<4,OutputTuple>::type> > input_ports_tuple_type;
2733         typedef OutputTuple output_type;
2734     private:
2735         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2736     public:
2737         unfolded_join_node(graph &g) : base_type(g) {}
2738     };
2739
2740     template<typename OutputTuple>
2741     class unfolded_join_node<6,OutputTuple,reserving> : public internal::join_node_base<reserving,
2742         std::tuple<
2743                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2744                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2745                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2746                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2747                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2748                 reserving_port<typename std::tuple_element<5,OutputTuple>::type> >,
2749         OutputTuple
2750                     > {
2751     public:
2752         typedef typename std::tuple<
2753                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2754                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2755                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>, 
2756                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>, 
2757                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>, 
2758                 reserving_port<typename std::tuple_element<5,OutputTuple>::type> > input_ports_tuple_type;
2759         typedef OutputTuple output_type;
2760     private:
2761         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2762     public:
2763         unfolded_join_node(graph &g) : base_type(g) {}
2764     };
2765
2766     template<typename OutputTuple>
2767     class unfolded_join_node<7,OutputTuple,reserving> : public internal::join_node_base<reserving,
2768         std::tuple<
2769                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2770                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2771                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2772                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2773                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2774                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2775                 reserving_port<typename std::tuple_element<6,OutputTuple>::type> >,
2776         OutputTuple
2777                 > {
2778     public:
2779         typedef typename std::tuple<
2780                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2781                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2782                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>, 
2783                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>, 
2784                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>, 
2785                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>, 
2786                 reserving_port<typename std::tuple_element<6,OutputTuple>::type> > input_ports_tuple_type;
2787         typedef OutputTuple output_type;
2788     private:
2789         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2790     public:
2791         unfolded_join_node(graph &g) : base_type(g) {}
2792     };
2793
2794     template<typename OutputTuple>
2795     class unfolded_join_node<8,OutputTuple,reserving> : public internal::join_node_base<reserving,
2796         std::tuple<
2797                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2798                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2799                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2800                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2801                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2802                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2803                 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2804                 reserving_port<typename std::tuple_element<7,OutputTuple>::type> >,
2805         OutputTuple
2806                 > {
2807     public:
2808         typedef typename std::tuple<
2809                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2810                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2811                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>, 
2812                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>, 
2813                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>, 
2814                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>, 
2815                 reserving_port<typename std::tuple_element<6,OutputTuple>::type>, 
2816                 reserving_port<typename std::tuple_element<7,OutputTuple>::type> > input_ports_tuple_type;
2817         typedef OutputTuple output_type;
2818     private:
2819         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2820     public:
2821         unfolded_join_node(graph &g) : base_type(g) {}
2822     };
2823
2824     template<typename OutputTuple>
2825     class unfolded_join_node<9,OutputTuple,reserving> : public internal::join_node_base<reserving,
2826         std::tuple<
2827                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2828                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2829                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2830                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2831                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2832                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2833                 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2834                 reserving_port<typename std::tuple_element<7,OutputTuple>::type>,
2835                 reserving_port<typename std::tuple_element<8,OutputTuple>::type> >,
2836         OutputTuple
2837                 > {
2838     public:
2839         typedef typename std::tuple<
2840                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2841                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2842                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>, 
2843                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>, 
2844                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>, 
2845                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>, 
2846                 reserving_port<typename std::tuple_element<6,OutputTuple>::type>, 
2847                 reserving_port<typename std::tuple_element<7,OutputTuple>::type>, 
2848                 reserving_port<typename std::tuple_element<8,OutputTuple>::type> > input_ports_tuple_type;
2849         typedef OutputTuple output_type;
2850     private:
2851         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2852     public:
2853         unfolded_join_node(graph &g) : base_type(g) {}
2854     };
2855
2856     template<typename OutputTuple>
2857     class unfolded_join_node<10,OutputTuple,reserving> : public internal::join_node_base<reserving,
2858         std::tuple<
2859                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2860                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2861                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2862                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2863                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2864                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2865                 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2866                 reserving_port<typename std::tuple_element<7,OutputTuple>::type>,
2867                 reserving_port<typename std::tuple_element<8,OutputTuple>::type>,
2868                 reserving_port<typename std::tuple_element<9,OutputTuple>::type> >,
2869         OutputTuple
2870                 > {
2871     public:
2872         typedef typename std::tuple<
2873                 reserving_port<typename std::tuple_element<0,OutputTuple>::type>, 
2874                 reserving_port<typename std::tuple_element<1,OutputTuple>::type>, 
2875                 reserving_port<typename std::tuple_element<2,OutputTuple>::type>, 
2876                 reserving_port<typename std::tuple_element<3,OutputTuple>::type>, 
2877                 reserving_port<typename std::tuple_element<4,OutputTuple>::type>, 
2878                 reserving_port<typename std::tuple_element<5,OutputTuple>::type>, 
2879                 reserving_port<typename std::tuple_element<6,OutputTuple>::type>, 
2880                 reserving_port<typename std::tuple_element<7,OutputTuple>::type>, 
2881                 reserving_port<typename std::tuple_element<8,OutputTuple>::type>, 
2882                 reserving_port<typename std::tuple_element<9,OutputTuple>::type> > input_ports_tuple_type;
2883         typedef OutputTuple output_type;
2884     private:
2885         typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2886     public:
2887         unfolded_join_node(graph &g) : base_type(g) {}
2888     };
2889
2890     template<typename OutputTuple>
2891     class unfolded_join_node<2,OutputTuple,queueing> : public internal::join_node_base<queueing,
2892         std::tuple<
2893                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
2894                 queueing_port<typename std::tuple_element<1,OutputTuple>::type> >,
2895         OutputTuple
2896                   >
2897                   {
2898     public:
2899         typedef typename std::tuple<
2900                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
2901                 queueing_port<typename std::tuple_element<1,OutputTuple>::type> > input_ports_tuple_type;
2902         typedef OutputTuple output_type;
2903     private:
2904         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2905     public:
2906         unfolded_join_node(graph &g) : base_type(g) {}
2907     };
2908
2909     template<typename OutputTuple>
2910     class unfolded_join_node<3,OutputTuple,queueing> : public internal::join_node_base<queueing,
2911         std::tuple<
2912                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2913                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2914                 queueing_port<typename std::tuple_element<2,OutputTuple>::type> >,
2915         OutputTuple
2916                     >
2917                     {
2918     public:
2919         typedef typename std::tuple<
2920                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
2921                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
2922                 queueing_port<typename std::tuple_element<2,OutputTuple>::type> > input_ports_tuple_type;
2923         typedef OutputTuple output_type;
2924     private:
2925         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2926     public:
2927         unfolded_join_node(graph &g) : base_type(g) {}
2928     };
2929
2930     template<typename OutputTuple>
2931     class unfolded_join_node<4,OutputTuple,queueing> : public internal::join_node_base<queueing,
2932         std::tuple<
2933                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2934                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2935                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2936                 queueing_port<typename std::tuple_element<3,OutputTuple>::type> >,
2937         OutputTuple
2938                     > {
2939     public:
2940         typedef typename std::tuple<
2941                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
2942                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
2943                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>, 
2944                 queueing_port<typename std::tuple_element<3,OutputTuple>::type> > input_ports_tuple_type;
2945         typedef OutputTuple output_type;
2946     private:
2947         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2948     public:
2949         unfolded_join_node(graph &g) : base_type(g) {}
2950     };
2951
2952     template<typename OutputTuple>
2953     class unfolded_join_node<5,OutputTuple,queueing> : public internal::join_node_base<queueing,
2954         std::tuple<
2955                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2956                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2957                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2958                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
2959                 queueing_port<typename std::tuple_element<4,OutputTuple>::type> >,
2960         OutputTuple
2961                 > {
2962     public:
2963         typedef typename std::tuple<
2964                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
2965                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
2966                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>, 
2967                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>, 
2968                 queueing_port<typename std::tuple_element<4,OutputTuple>::type> > input_ports_tuple_type;
2969         typedef OutputTuple output_type;
2970     private:
2971         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2972     public:
2973         unfolded_join_node(graph &g) : base_type(g) {}
2974     };
2975
2976     template<typename OutputTuple>
2977     class unfolded_join_node<6,OutputTuple,queueing> : public internal::join_node_base<queueing,
2978         std::tuple<
2979                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2980                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2981                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2982                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
2983                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
2984                 queueing_port<typename std::tuple_element<5,OutputTuple>::type> >,
2985         OutputTuple
2986                     > {
2987     public:
2988         typedef typename std::tuple<
2989                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
2990                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
2991                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>, 
2992                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>, 
2993                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>, 
2994                 queueing_port<typename std::tuple_element<5,OutputTuple>::type> > input_ports_tuple_type;
2995         typedef OutputTuple output_type;
2996     private:
2997         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2998     public:
2999         unfolded_join_node(graph &g) : base_type(g) {}
3000     };
3001
3002     template<typename OutputTuple>
3003     class unfolded_join_node<7,OutputTuple,queueing> : public internal::join_node_base<queueing,
3004         std::tuple<
3005                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3006                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3007                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3008                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3009                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3010                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3011                 queueing_port<typename std::tuple_element<6,OutputTuple>::type> >,
3012         OutputTuple
3013                 > {
3014     public:
3015         typedef typename std::tuple<
3016                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
3017                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
3018                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>, 
3019                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>, 
3020                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>, 
3021                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>, 
3022                 queueing_port<typename std::tuple_element<6,OutputTuple>::type> > input_ports_tuple_type;
3023         typedef OutputTuple output_type;
3024     private:
3025         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3026     public:
3027         unfolded_join_node(graph &g) : base_type(g) {}
3028     };
3029
3030     template<typename OutputTuple>
3031     class unfolded_join_node<8,OutputTuple,queueing> : public internal::join_node_base<queueing,
3032         std::tuple<
3033                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3034                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3035                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3036                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3037                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3038                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3039                 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3040                 queueing_port<typename std::tuple_element<7,OutputTuple>::type> >,
3041         OutputTuple
3042                 > {
3043     public:
3044         typedef typename std::tuple<
3045                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
3046                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
3047                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>, 
3048                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>, 
3049                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>, 
3050                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>, 
3051                 queueing_port<typename std::tuple_element<6,OutputTuple>::type>, 
3052                 queueing_port<typename std::tuple_element<7,OutputTuple>::type> > input_ports_tuple_type;
3053         typedef OutputTuple output_type;
3054     private:
3055         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3056     public:
3057         unfolded_join_node(graph &g) : base_type(g) {}
3058     };
3059
3060     template<typename OutputTuple>
3061     class unfolded_join_node<9,OutputTuple,queueing> : public internal::join_node_base<queueing,
3062         std::tuple<
3063                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3064                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3065                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3066                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3067                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3068                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3069                 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3070                 queueing_port<typename std::tuple_element<7,OutputTuple>::type>,
3071                 queueing_port<typename std::tuple_element<8,OutputTuple>::type> >,
3072         OutputTuple
3073                 > {
3074     public:
3075         typedef typename std::tuple<
3076                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
3077                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
3078                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>, 
3079                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>, 
3080                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>, 
3081                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>, 
3082                 queueing_port<typename std::tuple_element<6,OutputTuple>::type>, 
3083                 queueing_port<typename std::tuple_element<7,OutputTuple>::type>, 
3084                 queueing_port<typename std::tuple_element<8,OutputTuple>::type> > input_ports_tuple_type;
3085         typedef OutputTuple output_type;
3086     private:
3087         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3088     public:
3089         unfolded_join_node(graph &g) : base_type(g) {}
3090     };
3091
3092     template<typename OutputTuple>
3093     class unfolded_join_node<10,OutputTuple,queueing> : public internal::join_node_base<queueing,
3094         std::tuple<
3095                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3096                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3097                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3098                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3099                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3100                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3101                 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3102                 queueing_port<typename std::tuple_element<7,OutputTuple>::type>,
3103                 queueing_port<typename std::tuple_element<8,OutputTuple>::type>,
3104                 queueing_port<typename std::tuple_element<9,OutputTuple>::type> >,
3105         OutputTuple
3106                 > {
3107     public:
3108         typedef typename std::tuple<
3109                 queueing_port<typename std::tuple_element<0,OutputTuple>::type>, 
3110                 queueing_port<typename std::tuple_element<1,OutputTuple>::type>, 
3111                 queueing_port<typename std::tuple_element<2,OutputTuple>::type>, 
3112                 queueing_port<typename std::tuple_element<3,OutputTuple>::type>, 
3113                 queueing_port<typename std::tuple_element<4,OutputTuple>::type>, 
3114                 queueing_port<typename std::tuple_element<5,OutputTuple>::type>, 
3115                 queueing_port<typename std::tuple_element<6,OutputTuple>::type>, 
3116                 queueing_port<typename std::tuple_element<7,OutputTuple>::type>, 
3117                 queueing_port<typename std::tuple_element<8,OutputTuple>::type>, 
3118                 queueing_port<typename std::tuple_element<9,OutputTuple>::type> > input_ports_tuple_type;
3119         typedef OutputTuple output_type;
3120     private:
3121         typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3122     public:
3123         unfolded_join_node(graph &g) : base_type(g) {}
3124     };
3125
3126     //! templated function to refer to input ports of the join node
3127     template<size_t N, typename JNT>
3128     typename std::tuple_element<N, typename JNT::input_ports_tuple_type>::type &input_port(JNT &jn) {
3129         return std::get<N>(jn.inputs());
3130     }
3131
3132     } // namespace internal
3133
3134 using namespace internal::join_policy_namespace;
3135 using internal::input_port;
3136
3137 template<typename OutputTuple, join_policy JP=reserving>
3138 class join_node: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, OutputTuple, JP> {
3139 private:
3140     static const int N = std::tuple_size<OutputTuple>::value;
3141     typedef typename internal::unfolded_join_node<N, OutputTuple, JP> unfolded_type;
3142 public:
3143     typedef OutputTuple output_type;
3144     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
3145     join_node(graph &g) : unfolded_type(g) { }
3146 };
3147
3148     //
3149     // Making edges
3150     //
3151   
3152     //! Makes an edge between a single predecessor and a single successor
3153     template< typename T >
3154     inline void make_edge( sender<T> &p, receiver<T> &s ) {
3155         p.register_successor( s );
3156     }
3157
3158     //! Makes edges between a single predecessor and multiple successors
3159     template< typename T, typename SIterator >
3160     inline void make_edges( sender<T> &p, SIterator s_begin, SIterator s_end ) {
3161         for ( SIterator i = s_begin; i != s_end; ++i ) {
3162             make_edge( p, **i );
3163         }
3164     }
3165
3166     //! Makes edges between a set of predecessors and a single successor
3167     template< typename T, typename PIterator >
3168     inline void make_edges( PIterator p_begin, PIterator p_end, receiver<T> &s ) {
3169         for ( PIterator i = p_begin; i != p_end; ++i ) {
3170             make_edge( **i, s );
3171         }
3172     }
3173
3174 }
3175
3176 #endif
3177