]> git.sesse.net Git - casparcg/blob - tbb/include/tbb/flow_graph.h
2.0. Updated tbb library.
[casparcg] / tbb / include / tbb / flow_graph.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_graph_H
30 #define __TBB_graph_H
31
32 #if !TBB_PREVIEW_GRAPH
33 #error Set TBB_PREVIEW_GRAPH to include graph.h
34 #endif
35
36 #include "tbb_stddef.h"
37 #include "atomic.h"
38 #include "spin_mutex.h"
39 #include "null_mutex.h"
40 #include "spin_rw_mutex.h"
41 #include "null_rw_mutex.h"
42 #include "task.h"
43 #include "concurrent_vector.h"
44 #include "internal/_aggregator_impl.h"
45
46 // use the VC10 or gcc version of tuple if it is available.
47 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
48 #define TBB_PREVIEW_TUPLE 1
49 #include "compat/tuple"
50 #else
51 #include <tuple>
52 #endif
53
54 #include<list>
55 #include<queue>
56
57 /** @file
58   \brief The graph related classes and functions
59
60   There are some applications that best express dependencies as messages
61   passed between nodes in a graph.  These messages may contain data or
62   simply act as signals that a predecessors has completed. The graph
63   class and its associated node classes can be used to express such
64   applcations.
65 */
66
67 namespace tbb {
68 namespace flow {
69
70 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
71 enum concurrency { unlimited = 0, serial = 1 };
72
73 namespace interface6 {
74
75 //! The base of all graph nodes.  Allows them to be stored in a collection for deletion.
76 class graph_node {
77 public:
78     virtual ~graph_node() {} 
79 }; 
80
81 //! An empty class used for messages that mean "I'm done" 
82 class continue_msg {};
83         
84 template< typename T > class sender;
85 template< typename T > class receiver;
86 class continue_receiver;
87         
88 //! Pure virtual template class that defines a sender of messages of type T
89 template< typename T >
90 class sender {
91 public:
92     //! The output type of this sender
93     typedef T output_type;
94         
95     //! The successor type for this node
96     typedef receiver<T> successor_type;
97         
98     virtual ~sender() {}
99         
100     //! Add a new successor to this node
101     virtual bool register_successor( successor_type &r ) = 0;
102         
103     //! Removes a successor from this node
104     virtual bool remove_successor( successor_type &r ) = 0;
105         
106     //! Request an item from the sender
107     virtual bool try_get( T & ) { return false; }
108         
109     //! Reserves an item in the sender 
110     virtual bool try_reserve( T & ) { return false; }
111         
112     //! Releases the reserved item
113     virtual bool try_release( ) { return false; }
114         
115     //! Consumes the reserved item
116     virtual bool try_consume( ) { return false; }
117         
118 };
119         
120         
121 //! Pure virtual template class that defines a receiver of messages of type T
122 template< typename T >
123 class receiver {
124 public:
125         
126     //! The input type of this receiver
127     typedef T input_type;
128         
129     //! The predecessor type for this node
130     typedef sender<T> predecessor_type;
131         
132     //! Destructor
133     virtual ~receiver() {}
134         
135     //! Put an item to the receiver
136     virtual bool try_put( const T& t ) = 0;
137         
138     //! Add a predecessor to the node
139     virtual bool register_predecessor( predecessor_type & ) { return false; }
140         
141     //! Remove a predecessor from the node
142     virtual bool remove_predecessor( predecessor_type & ) { return false; }
143         
144 };
145         
146 //! Base class for receivers of completion messages
147 /** These receivers automatically reset, but cannot be explicitly waited on */
148 class continue_receiver : public receiver< continue_msg > {
149 public:
150         
151     //! The input type
152     typedef continue_msg input_type;
153         
154     //! The predecessor type for this node
155     typedef sender< continue_msg > predecessor_type;
156         
157     //! Constructor
158     continue_receiver( int number_of_predecessors = 0 ) { 
159         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
160         my_current_count = 0;
161     }
162         
163     //! Copy constructor
164     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() { 
165         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
166         my_current_count = 0;
167     }
168         
169     //! Destructor
170     virtual ~continue_receiver() { }
171         
172     //! Increments the trigger threshold
173     /* override */ bool register_predecessor( predecessor_type & ) {
174         spin_mutex::scoped_lock l(my_mutex);
175         ++my_predecessor_count;
176         return true;
177     }
178         
179     //! Decrements the trigger threshold
180     /** Does not check to see if the removal of the predecessor now makes the current count
181         exceed the new threshold.  So removing a predecessor while the graph is active can cause
182         unexpected results. */
183     /* override */ bool remove_predecessor( predecessor_type & ) {
184         spin_mutex::scoped_lock l(my_mutex);
185         --my_predecessor_count;
186         return true;
187     }
188         
189     //! Puts a continue_msg to the receiver
190     /** If the message causes the message count to reach the predecessor count, execute() is called and
191         the message count is reset to 0.  Otherwise the message count is incremented. */
192     /* override */ bool try_put( const input_type & ) {
193         {
194             spin_mutex::scoped_lock l(my_mutex);
195             if ( ++my_current_count < my_predecessor_count ) 
196                 return true;
197             else
198                 my_current_count = 0;
199         }
200         execute();
201         return true;
202     }
203         
204 protected:
205         
206     spin_mutex my_mutex;
207     int my_predecessor_count;
208     int my_current_count;
209     int my_initial_predecessor_count;
210         
211     //! Does whatever should happen when the threshold is reached
212     /** This should be very fast or else spawn a task.  This is
213         called while the sender is blocked in the try_put(). */
214     virtual void execute() = 0;
215         
216 };
217
218 #include "internal/_flow_graph_impl.h"
219 using namespace internal::graph_policy_namespace;
220
221 //! The graph class
222 /** This class serves as a handle to the graph */
223 class graph : tbb::internal::no_copy {
224         
225     template< typename Body >
226     class run_task : public task {
227     public: 
228         run_task( Body& body ) : my_body(body) {}
229         task *execute() {
230             my_body();
231             return NULL;
232         }
233     private:
234         Body my_body;
235     };
236         
237     template< typename Receiver, typename Body >
238     class run_and_put_task : public task {
239     public: 
240         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
241         task *execute() {
242             my_receiver.try_put( my_body() );
243             return NULL;
244         }
245     private:
246         Receiver &my_receiver;
247         Body my_body;
248     };
249         
250 public:
251         
252         
253     //! Constructs a graph withy no nodes.
254     graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
255         my_root_task->set_ref_count(1);
256     }
257         
258     //! Destroys the graph.
259     /** Calls wait_for_all on the graph, deletes all of the nodes appended by calls to add, and then 
260         destroys the root task of the graph. */ 
261     ~graph() {
262         wait_for_all();
263         my_root_task->set_ref_count(0);
264         task::destroy( *my_root_task );
265     }
266         
267         
268     //! Used to register that an external entity may still interact with the graph.
269     /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
270         is made. */
271     void increment_wait_count() { 
272         if (my_root_task)
273             my_root_task->increment_ref_count();
274     }
275         
276     //! Deregisters an external entity that may have interacted with the graph.
277     /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
278         matches the number of increment_wait_count calls. */
279     void decrement_wait_count() { 
280         if (my_root_task)
281             my_root_task->decrement_ref_count(); 
282     }
283         
284     //! Spawns a task that runs a body and puts its output to a specific receiver
285     /** The task is spawned as a child of the graph. This is useful for running tasks 
286         that need to block a wait_for_all() on the graph.  For example a one-off source. */
287     template< typename Receiver, typename Body >
288         void run( Receiver &r, Body body ) {
289        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
290            run_and_put_task< Receiver, Body >( r, body ) );
291     }
292         
293     //! Spawns a task that runs a function object 
294     /** The task is spawned as a child of the graph. This is useful for running tasks 
295         that need to block a wait_for_all() on the graph. For example a one-off source. */
296     template< typename Body >
297     void run( Body body ) {
298        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
299            run_task< Body >( body ) );
300     }
301         
302     //! Waits until the graph is idle and the number of decrement_wait_count calls equals the number of increment_wait_count calls.
303     /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
304     void wait_for_all() {
305         if (my_root_task)
306             my_root_task->wait_for_all();
307         my_root_task->set_ref_count(1);
308     }
309         
310     //! Returns the root task of the graph
311     task * root_task() {
312         return my_root_task;
313     }
314         
315 private:
316         
317     task *my_root_task;
318         
319 };
320
321 #include "internal/_flow_graph_node_impl.h"
322
323 //! An executable node that acts as a source, i.e. it has no predecessors
324 template < typename Output >
325 class source_node : public graph_node, public sender< Output > {
326 public:
327         
328     //! The type of the output message, which is complete
329     typedef Output output_type;           
330         
331     //! The type of successors of this node
332     typedef receiver< Output > successor_type;
333         
334     //! Constructor for a node with a successor
335     template< typename Body >
336     source_node( graph &g, Body body, bool is_active = true )
337         : my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
338         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
339         my_reserved(false), my_has_cached_item(false) 
340     { 
341         my_successors.set_owner(this);
342     }
343         
344     //! Copy constructor
345     source_node( const source_node& src ) :
346 #if ( __TBB_GCC_VERSION < 40202 )
347         graph_node(), sender<Output>(),
348 #endif
349         my_root_task( src.my_root_task), my_active(src.init_my_active),
350         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
351         my_reserved(false), my_has_cached_item(false)
352     {
353         my_successors.set_owner(this);
354     }
355
356     //! The destructor
357     ~source_node() { delete my_body; }
358         
359     //! Add a new successor to this node
360     /* override */ bool register_successor( receiver<output_type> &r ) {
361         spin_mutex::scoped_lock lock(my_mutex);
362         my_successors.register_successor(r);
363         if ( my_active )
364             spawn_put();
365         return true;
366     }
367         
368     //! Removes a successor from this node
369     /* override */ bool remove_successor( receiver<output_type> &r ) {
370         spin_mutex::scoped_lock lock(my_mutex);
371         my_successors.remove_successor(r);
372         return true;
373     }
374         
375     //! Request an item from the node
376     /*override */ bool try_get( output_type &v ) {
377         spin_mutex::scoped_lock lock(my_mutex);
378         if ( my_reserved )  
379             return false;
380         
381         if ( my_has_cached_item ) {
382             v = my_cached_item;
383             my_has_cached_item = false;
384         } else if ( (*my_body)(v) == false ) {
385             return false;
386         }
387         return true;
388     }
389         
390     //! Reserves an item.
391     /* override */ bool try_reserve( output_type &v ) {
392         spin_mutex::scoped_lock lock(my_mutex);
393         if ( my_reserved ) {
394             return false;
395         }
396         
397         if ( !my_has_cached_item && (*my_body)(my_cached_item) )  
398             my_has_cached_item = true;
399         
400         if ( my_has_cached_item ) {
401             v = my_cached_item;
402             my_reserved = true;
403             return true;
404         } else {
405             return false;
406         }
407     }
408         
409     //! Release a reserved item.  
410     /**  true = item has been released and so remains in sender, dest must request or reserve future items */
411     /* override */ bool try_release( ) {
412         spin_mutex::scoped_lock lock(my_mutex);
413         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
414         my_reserved = false;
415         spawn_put();
416         return true;
417     }
418         
419     //! Consumes a reserved item
420     /* override */ bool try_consume( ) {
421         spin_mutex::scoped_lock lock(my_mutex);
422         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
423         my_reserved = false;
424         my_has_cached_item = false;
425         if ( !my_successors.empty() ) {
426             spawn_put();
427         }
428         return true;
429     }
430         
431     //! Activates a node that was created in the inactive state
432     void activate() {
433         spin_mutex::scoped_lock lock(my_mutex);
434         my_active = true;
435         if ( !my_successors.empty() )
436             spawn_put();
437     }
438         
439 private:
440         
441     task *my_root_task;
442     spin_mutex my_mutex;
443     bool my_active;
444     bool init_my_active;
445     internal::source_body<output_type> *my_body;
446     internal::broadcast_cache< output_type > my_successors;
447     bool my_reserved;
448     bool my_has_cached_item;
449     output_type my_cached_item;
450         
451     friend class internal::source_task< source_node< output_type > >;
452         
453     //! Applies the body
454     /* override */ void apply_body( ) {
455         output_type v;
456         if ( try_reserve(v) == false )
457             return;
458         
459         if ( my_successors.try_put( v ) ) 
460             try_consume();
461         else
462             try_release();
463     }
464         
465     //! Spawns a task that applies the body
466     /* override */ void spawn_put( ) {
467         task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
468            internal::source_task< source_node< output_type > >( *this ) ); 
469     }
470         
471 };
472         
473 //! Implements a function node that supports Input -> Output
474 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
475 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
476 public:
477         
478     typedef Input input_type;
479     typedef Output output_type;
480     typedef sender< input_type > predecessor_type;
481     typedef receiver< output_type > successor_type;
482         
483     //! Constructor
484     template< typename Body >
485     function_node( graph &g, size_t concurrency, Body body )
486     : internal::function_input<input_type,output_type,Allocator>( g, concurrency, body ) {
487         my_successors.set_owner(this);
488     }
489
490     //! Copy constructor
491     function_node( const function_node& src ) : 
492 #if ( __TBB_GCC_VERSION < 40202 )
493         graph_node(), 
494 #endif
495         internal::function_input<input_type,output_type,Allocator>( src )
496 #if ( __TBB_GCC_VERSION < 40202 )
497         , internal::function_output<Output>()
498 #endif
499     {
500         my_successors.set_owner(this);
501     }
502         
503 protected:
504
505     internal::broadcast_cache<output_type> my_successors; 
506     /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
507         
508 };
509
510 //! Implements a function node that supports Input -> Output
511 template < typename Input, typename Output, typename Allocator >
512 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
513 public:
514         
515     typedef Input input_type;
516     typedef Output output_type;
517     typedef sender< input_type > predecessor_type;
518     typedef receiver< output_type > successor_type;
519         
520     //! Constructor
521     template< typename Body >
522     function_node( graph &g, size_t concurrency, Body body )
523     : internal::function_input< input_type, output_type, Allocator >( g, concurrency, body, new internal::function_input_queue< input_type, Allocator >() ) {
524         my_successors.set_owner(this);
525     }
526
527     //! Copy constructor
528     function_node( const function_node& src ) : 
529 #if ( __TBB_GCC_VERSION < 40202 )
530         graph_node(), 
531 #endif
532         internal::function_input<input_type,output_type,Allocator>( src, new internal::function_input_queue< input_type, Allocator >() )
533 #if ( __TBB_GCC_VERSION < 40202 )
534         , internal::function_output<Output>()
535 #endif
536     {
537         my_successors.set_owner(this);
538     }
539
540 protected:
541
542     internal::broadcast_cache<output_type> my_successors; 
543     /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
544         
545 };
546         
547 //! Implements an executable node that supports continue_msg -> Output
548 template <typename Output>
549 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
550 public:
551         
552     typedef continue_msg input_type;
553     typedef Output output_type;
554     typedef sender< input_type > predecessor_type;
555     typedef receiver< output_type > successor_type;
556         
557      //! Constructor for executable node with continue_msg -> Output
558      template <typename Body >
559      continue_node( graph &g, Body body )
560              : internal::continue_input<output_type>( g, body ) {
561          my_successors.set_owner(this);
562      }
563         
564     //! Constructor for executable node with continue_msg -> Output
565     template <typename Body >
566     continue_node( graph &g, int number_of_predecessors, Body body )
567         : internal::continue_input<output_type>( g, number_of_predecessors, body )
568     {
569         my_successors.set_owner(this);
570     }
571  
572     //! Copy constructor       
573     continue_node( const continue_node& src ) :
574 #if ( __TBB_GCC_VERSION < 40202 )
575         graph_node(),
576 #endif
577         internal::continue_input<output_type>(src)
578 #if ( __TBB_GCC_VERSION < 40202 )
579         , internal::function_output<Output>()
580 #endif
581     {
582         my_successors.set_owner(this);
583     }
584
585 protected:
586         
587     internal::broadcast_cache<output_type> my_successors; 
588     /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
589         
590 };
591         
592 template< typename T >
593 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
594 public:
595         
596     typedef T input_type;
597     typedef T output_type;
598     typedef sender< input_type > predecessor_type;
599     typedef receiver< output_type > successor_type;
600         
601     overwrite_node() : my_buffer_is_valid(false) {
602         my_successors.set_owner( this );
603     }
604
605     // Copy constructor; doesn't take anything from src; default won't work
606     overwrite_node( const overwrite_node& ) : 
607 #if ( __TBB_GCC_VERSION < 40202 )
608         graph_node(), receiver<T>(), sender<T>(),
609 #endif
610         my_buffer_is_valid(false) 
611     {
612         my_successors.set_owner( this );
613     }
614         
615     ~overwrite_node() {}
616         
617     /* override */ bool register_successor( successor_type &s ) {
618         spin_mutex::scoped_lock l( my_mutex );
619         if ( my_buffer_is_valid ) {
620             // We have a valid value that must be forwarded immediately.
621             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
622                 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
623                 my_successors.register_successor( s );
624                 return true;
625             } else {
626                 // We don't add the successor: it rejected our put and we became its predecessor instead
627                 return false;
628             }
629         } else {
630             // No valid value yet, just add as successor
631             my_successors.register_successor( s );
632             return true;
633         }
634     }
635         
636     /* override */ bool remove_successor( successor_type &s ) {
637         spin_mutex::scoped_lock l( my_mutex );
638         my_successors.remove_successor(s);
639         return true;
640     }
641         
642     /* override */ bool try_put( const T &v ) {
643         spin_mutex::scoped_lock l( my_mutex );
644         my_buffer = v;
645         my_buffer_is_valid = true;
646         my_successors.try_put(v);
647         return true;
648     }
649         
650     /* override */ bool try_get( T &v ) {
651         spin_mutex::scoped_lock l( my_mutex );
652         if ( my_buffer_is_valid ) {
653             v = my_buffer;
654             return true;
655         } else {
656             return false;
657         }
658     }
659         
660     bool is_valid() {
661        spin_mutex::scoped_lock l( my_mutex );
662        return my_buffer_is_valid;
663     }
664         
665     void clear() {
666        spin_mutex::scoped_lock l( my_mutex );
667        my_buffer_is_valid = false;
668     }
669         
670 protected:
671         
672     spin_mutex my_mutex;
673     internal::broadcast_cache< T, null_rw_mutex > my_successors;
674     T my_buffer;
675     bool my_buffer_is_valid;
676         
677 };
678         
679 template< typename T >
680 class write_once_node : public overwrite_node<T> {
681 public:
682         
683     typedef T input_type;
684     typedef T output_type;
685     typedef sender< input_type > predecessor_type;
686     typedef receiver< output_type > successor_type;
687         
688     //! Constructor
689     write_once_node() : overwrite_node<T>() {}
690
691     //! Copy constructor: call base class copy constructor
692     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
693
694     /* override */ bool try_put( const T &v ) {
695         spin_mutex::scoped_lock l( this->my_mutex );
696         if ( this->my_buffer_is_valid ) {
697             return false;
698         } else {
699             this->my_buffer = v;
700             this->my_buffer_is_valid = true;
701             this->my_successors.try_put(v);
702             return true;
703         }
704     }
705 };
706         
707 //! Forwards messages of type T to all successors
708 template <typename T>
709 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
710         
711     internal::broadcast_cache<T> my_successors;
712         
713 public:
714         
715     typedef T input_type;
716     typedef T output_type;
717     typedef sender< input_type > predecessor_type;
718     typedef receiver< output_type > successor_type;
719         
720     broadcast_node( ) {
721        my_successors.set_owner( this ); 
722     }
723         
724     // Copy constructor
725     broadcast_node( const broadcast_node& ) 
726 #if ( __TBB_GCC_VERSION < 40202 )
727         : graph_node(), receiver<T>(), sender<T>()
728 #endif
729     {
730        my_successors.set_owner( this ); 
731     }
732         
733     //! Adds a successor
734     virtual bool register_successor( receiver<T> &r ) {
735         my_successors.register_successor( r );
736         return true;
737     }
738         
739     //! Removes s as a successor
740     virtual bool remove_successor( receiver<T> &r ) {
741         my_successors.remove_successor( r );
742         return true;
743     }
744         
745     /* override */ bool try_put( const T &t ) {
746         my_successors.try_put(t);
747         return true;
748     }
749         
750 };
751
752 #include "internal/_flow_graph_item_buffer_impl.h"
753
754 //! Forwards messages in arbitrary order
755 template <typename T, typename A=cache_aligned_allocator<T> >
756 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
757 public:
758     typedef T input_type;
759     typedef T output_type;
760     typedef sender< input_type > predecessor_type;
761     typedef receiver< output_type > successor_type;
762     typedef buffer_node<T, A> my_class;
763 protected:
764     typedef size_t size_type;
765     internal::round_robin_cache< T, null_rw_mutex > my_successors;
766         
767     task *my_parent;
768         
769     friend class internal::forward_task< buffer_node< T, A > >;
770         
771     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
772     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
773         
774     // implements the aggregator_operation concept
775     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
776     public:
777         char type;
778         T *elem;
779         successor_type *r;
780         buffer_operation(const T& e, op_type t) :
781             type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
782         buffer_operation(op_type t) : type(char(t)), r(NULL) {}
783     };
784         
785     bool forwarder_busy;
786     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
787     friend class internal::aggregating_functor<my_class, buffer_operation>;
788     internal::aggregator< my_handler, buffer_operation> my_aggregator;
789         
790     virtual void handle_operations(buffer_operation *op_list) {
791         buffer_operation *tmp;
792         bool try_forwarding=false;
793         while (op_list) {
794             tmp = op_list;
795             op_list = op_list->next;
796             switch (tmp->type) {
797             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
798             case rem_succ: internal_rem_succ(tmp); break;
799             case req_item: internal_pop(tmp); break;
800             case res_item: internal_reserve(tmp); break;
801             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
802             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
803             case put_item: internal_push(tmp);  try_forwarding = true; break;
804             case try_fwd:  internal_forward(tmp); break;
805             }
806         }
807         if (try_forwarding && !forwarder_busy) {
808             forwarder_busy = true;
809             task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
810         }
811     }
812         
813     //! This is executed by an enqueued task, the "forwarder"
814     virtual void forward() {
815         buffer_operation op_data(try_fwd);
816         do {
817             op_data.status = WAIT;
818             my_aggregator.execute(&op_data);
819         } while (op_data.status == SUCCEEDED);
820     }
821         
822     //! Register successor
823     virtual void internal_reg_succ(buffer_operation *op) {
824         my_successors.register_successor(*(op->r));
825         __TBB_store_with_release(op->status, SUCCEEDED);
826     }
827         
828     //! Remove successor
829     virtual void internal_rem_succ(buffer_operation *op) {
830         my_successors.remove_successor(*(op->r));
831         __TBB_store_with_release(op->status, SUCCEEDED);
832     }
833         
834     //! Tries to forward valid items to successors
835     virtual void internal_forward(buffer_operation *op) {
836         T i_copy;
837         bool success = false; // flagged when a successor accepts
838         size_type counter = my_successors.size();
839         // Try forwarding, giving each successor a chance
840         while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
841             this->fetch_back(i_copy);
842             if( my_successors.try_put(i_copy) ) {
843                 this->invalidate_back();
844                 --(this->my_tail);
845                 success = true; // found an accepting successor
846             }
847             --counter;
848         }
849         if (success && !counter)
850             __TBB_store_with_release(op->status, SUCCEEDED);
851         else {
852             __TBB_store_with_release(op->status, FAILED);
853             forwarder_busy = false;
854         }
855     }
856         
857     virtual void internal_push(buffer_operation *op) {
858         this->push_back(*(op->elem));
859         __TBB_store_with_release(op->status, SUCCEEDED);
860     }
861         
862     virtual void internal_pop(buffer_operation *op) {
863         if(this->pop_back(*(op->elem))) {
864             __TBB_store_with_release(op->status, SUCCEEDED);
865         }
866         else {
867             __TBB_store_with_release(op->status, FAILED);
868         }
869     }
870         
871     virtual void internal_reserve(buffer_operation *op) {
872         if(this->reserve_front(*(op->elem))) {
873             __TBB_store_with_release(op->status, SUCCEEDED);
874         }
875         else {
876             __TBB_store_with_release(op->status, FAILED);
877         }
878     }
879         
880     virtual void internal_consume(buffer_operation *op) {
881         this->consume_front();
882         __TBB_store_with_release(op->status, SUCCEEDED);
883     }
884         
885     virtual void internal_release(buffer_operation *op) {
886         this->release_front();
887         __TBB_store_with_release(op->status, SUCCEEDED);
888     }
889         
890 public:
891     //! Constructor
892     buffer_node( graph &g ) : reservable_item_buffer<T>(),
893         my_parent( g.root_task() ), forwarder_busy(false) {
894         my_successors.set_owner(this);
895         my_aggregator.initialize_handler(my_handler(this));
896     }
897
898     //! Copy constructor
899     buffer_node( const buffer_node& src ) : 
900 #if ( __TBB_GCC_VERSION < 40202 )
901         graph_node(), 
902 #endif
903         reservable_item_buffer<T>(),
904 #if ( __TBB_GCC_VERSION < 40202 )
905         receiver<T>(), sender<T>(),
906 #endif
907         my_parent( src.my_parent )  
908     {
909         forwarder_busy = false;
910         my_successors.set_owner(this);
911         my_aggregator.initialize_handler(my_handler(this));
912     }
913
914     virtual ~buffer_node() {}
915         
916     //
917     // message sender implementation
918     //
919         
920     //! Adds a new successor.
921     /** Adds successor r to the list of successors; may forward tasks.  */
922     /* override */ bool register_successor( receiver<output_type> &r ) {
923         buffer_operation op_data(reg_succ);
924         op_data.r = &r;
925         my_aggregator.execute(&op_data);
926         return true;
927     }
928         
929     //! Removes a successor.
930     /** Removes successor r from the list of successors.
931         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
932     /* override */ bool remove_successor( receiver<output_type> &r ) {
933         r.remove_predecessor(*this);
934         buffer_operation op_data(rem_succ);
935         op_data.r = &r;
936         my_aggregator.execute(&op_data);
937         return true;
938     }
939         
940     //! Request an item from the buffer_node
941     /**  true = v contains the returned item<BR>
942          false = no item has been returned */
943     /* override */ bool try_get( T &v ) {
944         buffer_operation op_data(req_item);
945         op_data.elem = &v;
946         my_aggregator.execute(&op_data);
947         return (op_data.status==SUCCEEDED);
948     }
949         
950     //! Reserves an item.
951     /**  false = no item can be reserved<BR>
952          true = an item is reserved */
953     /* override */ bool try_reserve( T &v ) {
954         buffer_operation op_data(res_item);
955         op_data.elem = &v;
956         my_aggregator.execute(&op_data);
957         return (op_data.status==SUCCEEDED);
958     }
959         
960     //! Release a reserved item.
961     /**  true = item has been released and so remains in sender */
962     /* override */ bool try_release() {
963         buffer_operation op_data(rel_res);
964         my_aggregator.execute(&op_data);
965         return true;
966     }
967         
968     //! Consumes a reserved item.
969     /** true = item is removed from sender and reservation removed */
970     /* override */ bool try_consume() {
971         buffer_operation op_data(con_res);
972         my_aggregator.execute(&op_data);
973         return true;
974     }
975         
976     //! Receive an item
977     /** true is always returned */
978     /* override */ bool try_put(const T &t) {
979         buffer_operation op_data(t, put_item);
980         my_aggregator.execute(&op_data);
981         return true;
982     }
983 };
984         
985         
986 //! Forwards messages in FIFO order
987 template <typename T, typename A=cache_aligned_allocator<T> >
988 class queue_node : public buffer_node<T, A> {
989 protected:
990 typedef typename buffer_node<T, A>::size_type size_type;
991 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
992         
993     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
994         
995     //! Tries to forward valid items to successors
996     /* override */ void internal_forward(queue_operation *op) {
997         T i_copy;
998         bool success = false; // flagged when a successor accepts
999         size_type counter = this->my_successors.size();
1000         if (this->my_reserved || !this->item_valid(this->my_head)){
1001             __TBB_store_with_release(op->status, FAILED);
1002             this->forwarder_busy = false;
1003             return;
1004         }
1005         // Keep trying to send items while there is at least one accepting successor
1006         while (counter>0 && this->item_valid(this->my_head)) {
1007             this->fetch_front(i_copy);
1008             if(this->my_successors.try_put(i_copy)) {
1009                  this->invalidate_front();
1010                  ++(this->my_head);
1011                 success = true; // found an accepting successor
1012             }
1013             --counter;
1014         }
1015         if (success && !counter)
1016             __TBB_store_with_release(op->status, SUCCEEDED);
1017         else {
1018             __TBB_store_with_release(op->status, FAILED);
1019             this->forwarder_busy = false;
1020         }
1021     }
1022         
1023     /* override */ void internal_pop(queue_operation *op) {
1024         if ( this->my_reserved || !this->item_valid(this->my_head)){
1025             __TBB_store_with_release(op->status, FAILED);
1026         }
1027         else {
1028             this->pop_front(*(op->elem));
1029             __TBB_store_with_release(op->status, SUCCEEDED);
1030         }
1031     }
1032     /* override */ void internal_reserve(queue_operation *op) {
1033         if (this->my_reserved || !this->item_valid(this->my_head)) {
1034             __TBB_store_with_release(op->status, FAILED);
1035         }
1036         else {
1037             this->my_reserved = true;
1038             this->fetch_front(*(op->elem));
1039             this->invalidate_front();
1040             __TBB_store_with_release(op->status, SUCCEEDED);
1041         }
1042     }
1043     /* override */ void internal_consume(queue_operation *op) {
1044         this->consume_front();
1045         __TBB_store_with_release(op->status, SUCCEEDED);
1046     }
1047         
1048 public:
1049         
1050     typedef T input_type;
1051     typedef T output_type;
1052     typedef sender< input_type > predecessor_type;
1053     typedef receiver< output_type > successor_type;
1054         
1055     //! Constructor
1056     queue_node( graph &g ) : buffer_node<T, A>(g) {}
1057
1058     //! Copy constructor
1059     queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
1060 };
1061         
1062 //! Forwards messages in sequence order
1063 template< typename T, typename A=cache_aligned_allocator<T> >
1064 class sequencer_node : public queue_node<T, A> {
1065     internal::function_body< T, size_t > *my_sequencer;
1066 public:
1067         
1068     typedef T input_type;
1069     typedef T output_type;
1070     typedef sender< input_type > predecessor_type;
1071     typedef receiver< output_type > successor_type;
1072         
1073     //! Constructor
1074     template< typename Sequencer >
1075     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
1076         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
1077
1078     //! Copy constructor
1079     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
1080         my_sequencer( src.my_sequencer->clone() ) {}
1081         
1082     //! Destructor
1083     ~sequencer_node() { delete my_sequencer; }
1084 protected:
1085     typedef typename buffer_node<T, A>::size_type size_type;
1086     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
1087         
1088     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1089         
1090 private:
1091     /* override */ void internal_push(sequencer_operation *op) {
1092         size_type tag = (*my_sequencer)(*(op->elem));
1093         
1094         this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1095         
1096         if(this->size() > this->capacity())
1097             this->grow_my_array(this->size());  // tail already has 1 added to it
1098         this->item(tag) = std::make_pair( *(op->elem), true );
1099         __TBB_store_with_release(op->status, SUCCEEDED);
1100     }
1101 };
1102         
1103 //! Forwards messages in priority order
1104 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
1105 class priority_queue_node : public buffer_node<T, A> {
1106 public:
1107     typedef T input_type;
1108     typedef T output_type;
1109     typedef sender< input_type > predecessor_type;
1110     typedef receiver< output_type > successor_type;
1111         
1112     //! Constructor
1113     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
1114
1115     //! Copy constructor
1116     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
1117         
1118 protected:
1119     typedef typename buffer_node<T, A>::size_type size_type;
1120     typedef typename buffer_node<T, A>::item_type item_type;
1121     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
1122         
1123     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1124         
1125     /* override */ void handle_operations(prio_operation *op_list) {
1126         prio_operation *tmp /*, *pop_list*/ ;
1127         bool try_forwarding=false;
1128         while (op_list) {
1129             tmp = op_list;
1130             op_list = op_list->next;
1131             switch (tmp->type) {
1132             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
1133             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
1134             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
1135             case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
1136             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
1137             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
1138             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
1139             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
1140             }
1141         }
1142         // process pops!  for now, no special pop processing
1143         if (mark<this->my_tail) heapify();
1144         if (try_forwarding && !this->forwarder_busy) {
1145             this->forwarder_busy = true;
1146             task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
1147         }
1148     }
1149         
1150     //! Tries to forward valid items to successors
1151     /* override */ void internal_forward(prio_operation *op) {
1152         T i_copy;
1153         bool success = false; // flagged when a successor accepts
1154         size_type counter = this->my_successors.size();
1155         
1156         if (this->my_reserved || this->my_tail == 0) {
1157             __TBB_store_with_release(op->status, FAILED);
1158             this->forwarder_busy = false;
1159             return;
1160         }
1161         // Keep trying to send while there exists an accepting successor
1162         while (counter>0 && this->my_tail > 0) {
1163             i_copy = this->my_array[0].first;
1164             bool msg = this->my_successors.try_put(i_copy);
1165             if ( msg == true ) {
1166                  if (mark == this->my_tail) --mark;
1167                 --(this->my_tail);
1168                 this->my_array[0].first=this->my_array[this->my_tail].first;
1169                 if (this->my_tail > 1) // don't reheap for heap of size 1
1170                     reheap();
1171                 success = true; // found an accepting successor
1172             }
1173             --counter;
1174         }
1175         if (success && !counter)
1176             __TBB_store_with_release(op->status, SUCCEEDED);
1177         else {
1178             __TBB_store_with_release(op->status, FAILED);
1179             this->forwarder_busy = false;
1180         }
1181     }
1182         
1183     /* override */ void internal_push(prio_operation *op) {
1184         if ( this->my_tail >= this->my_array_size )
1185             this->grow_my_array( this->my_tail + 1 );
1186         this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
1187         ++(this->my_tail);
1188         __TBB_store_with_release(op->status, SUCCEEDED);
1189     }
1190     /* override */ void internal_pop(prio_operation *op) {
1191         if ( this->my_reserved == true || this->my_tail == 0 ) {
1192             __TBB_store_with_release(op->status, FAILED);
1193         }
1194         else {
1195             if (mark<this->my_tail &&
1196                 compare(this->my_array[0].first,
1197                         this->my_array[this->my_tail-1].first)) {
1198                 // there are newly pushed elems; last one higher than top
1199                 // copy the data
1200                 *(op->elem) = this->my_array[this->my_tail-1].first;
1201                 --(this->my_tail);
1202                 __TBB_store_with_release(op->status, SUCCEEDED);
1203             }
1204             else { // extract and push the last element down heap
1205                 *(op->elem) = this->my_array[0].first; // copy the data
1206                 if (mark == this->my_tail) --mark;
1207                 --(this->my_tail);
1208                 __TBB_store_with_release(op->status, SUCCEEDED);
1209                 this->my_array[0].first=this->my_array[this->my_tail].first;
1210                 if (this->my_tail > 1) // don't reheap for heap of size 1
1211                     reheap();
1212             }
1213         }
1214     }
1215     /* override */ void internal_reserve(prio_operation *op) {
1216         if (this->my_reserved == true || this->my_tail == 0) {
1217             __TBB_store_with_release(op->status, FAILED);
1218         }
1219         else {
1220             this->my_reserved = true;
1221             *(op->elem) = reserved_item = this->my_array[0].first;
1222             if (mark == this->my_tail) --mark;
1223             --(this->my_tail);
1224             __TBB_store_with_release(op->status, SUCCEEDED);
1225             this->my_array[0].first = this->my_array[this->my_tail].first;
1226             if (this->my_tail > 1) // don't reheap for heap of size 1
1227                 reheap();
1228         }
1229     }
1230     /* override */ void internal_consume(prio_operation *op) {
1231         this->my_reserved = false;
1232         __TBB_store_with_release(op->status, SUCCEEDED);
1233     }
1234     /* override */ void internal_release(prio_operation *op) {
1235         if (this->my_tail >= this->my_array_size)
1236             this->grow_my_array( this->my_tail + 1 );
1237         this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
1238         ++(this->my_tail);
1239         this->my_reserved = false;
1240         __TBB_store_with_release(op->status, SUCCEEDED);
1241         heapify();
1242     }
1243 private:
1244     Compare compare;
1245     size_type mark;
1246     input_type reserved_item;
1247         
1248     void heapify() {
1249         if (!mark) mark = 1;
1250         for (; mark<this->my_tail; ++mark) { // for each unheaped element
1251             size_type cur_pos = mark;
1252             input_type to_place = this->my_array[mark].first;
1253             do { // push to_place up the heap
1254                 size_type parent = (cur_pos-1)>>1;
1255                 if (!compare(this->my_array[parent].first, to_place))
1256                     break;
1257                 this->my_array[cur_pos].first = this->my_array[parent].first;
1258                 cur_pos = parent;
1259             } while( cur_pos );
1260             this->my_array[cur_pos].first = to_place;
1261         }
1262     }
1263         
1264     void reheap() {
1265         size_type cur_pos=0, child=1;
1266         while (child < mark) {
1267             size_type target = child;
1268             if (child+1<mark &&
1269                 compare(this->my_array[child].first,
1270                         this->my_array[child+1].first))
1271                 ++target;
1272             // target now has the higher priority child
1273             if (compare(this->my_array[target].first,
1274                         this->my_array[this->my_tail].first))
1275                 break;
1276             this->my_array[cur_pos].first = this->my_array[target].first;
1277             cur_pos = target;
1278             child = (cur_pos<<1)+1;
1279         }
1280         this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
1281     }
1282 };
1283         
1284 //! Forwards messages only if the threshold has not been reached
1285 /** This node forwards items until its threshold is reached.
1286     It contains no buffering.  If the downstream node rejects, the
1287     message is dropped. */
1288 template< typename T >
1289 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1290 public:
1291         
1292     typedef T input_type;
1293     typedef T output_type;
1294     typedef sender< input_type > predecessor_type;
1295     typedef receiver< output_type > successor_type;
1296         
1297 private:
1298         
1299     task *my_root_task;
1300     size_t my_threshold;
1301     size_t my_count;
1302     internal::predecessor_cache< T > my_predecessors;
1303     spin_mutex my_mutex;
1304     internal::broadcast_cache< T > my_successors;
1305     int init_decrement_predecessors;
1306
1307     friend class internal::forward_task< limiter_node<T> >;
1308         
1309     // Let decrementer call decrement_counter()
1310     friend class internal::decrementer< limiter_node<T> >;
1311         
1312     void decrement_counter() {
1313         input_type v;
1314         
1315         // If we can't get / put an item immediately then drop the count
1316         if ( my_predecessors.get_item( v ) == false 
1317              || my_successors.try_put(v) == false ) {
1318             spin_mutex::scoped_lock lock(my_mutex);
1319             --my_count;
1320             if ( !my_predecessors.empty() ) 
1321                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
1322                             internal::forward_task< limiter_node<T> >( *this ) );
1323         }
1324     }
1325         
1326     void forward() {
1327         {
1328             spin_mutex::scoped_lock lock(my_mutex);
1329             if ( my_count < my_threshold ) 
1330                 ++my_count;
1331             else
1332                 return;
1333         }
1334         decrement_counter();
1335     }
1336         
1337 public:
1338         
1339     //! The internal receiver< continue_msg > that decrements the count
1340     internal::decrementer< limiter_node<T> > decrement;
1341         
1342     //! Constructor
1343     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) : 
1344         my_root_task(g.root_task()), my_threshold(threshold), my_count(0), 
1345         init_decrement_predecessors(num_decrement_predecessors), 
1346         decrement(num_decrement_predecessors) 
1347     {
1348         my_predecessors.set_owner(this);
1349         my_successors.set_owner(this);
1350         decrement.set_owner(this);
1351     }
1352         
1353     //! Copy constructor
1354     limiter_node( const limiter_node& src ) : 
1355 #if ( __TBB_GCC_VERSION < 40202 )
1356         graph_node(), receiver<T>(), sender<T>(),
1357 #endif
1358         my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0), 
1359         init_decrement_predecessors(src.init_decrement_predecessors), 
1360         decrement(src.init_decrement_predecessors) 
1361     {
1362         my_predecessors.set_owner(this);
1363         my_successors.set_owner(this);
1364         decrement.set_owner(this);
1365     }
1366
1367     //! Replace the current successor with this new successor
1368     /* override */ bool register_successor( receiver<output_type> &r ) {
1369         my_successors.register_successor(r);
1370         return true;
1371     }
1372         
1373     //! Removes a successor from this node
1374     /** r.remove_predecessor(*this) is also called. */
1375     /* override */ bool remove_successor( receiver<output_type> &r ) {
1376         r.remove_predecessor(*this);
1377         my_successors.remove_successor(r);
1378         return true;
1379     }
1380         
1381     //! Puts an item to this receiver
1382     /* override */ bool try_put( const T &t ) {
1383         {
1384             spin_mutex::scoped_lock lock(my_mutex);
1385             if ( my_count >= my_threshold ) 
1386                 return false;
1387             else
1388                 ++my_count; 
1389         }
1390         
1391         bool msg = my_successors.try_put(t);
1392         
1393         if ( msg != true ) {
1394             spin_mutex::scoped_lock lock(my_mutex);
1395             --my_count;
1396             if ( !my_predecessors.empty() ) 
1397                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
1398                             internal::forward_task< limiter_node<T> >( *this ) );
1399         }
1400         
1401         return msg;
1402     }
1403         
1404     //! Removes src from the list of cached predecessors.
1405     /* override */ bool register_predecessor( predecessor_type &src ) {
1406         spin_mutex::scoped_lock lock(my_mutex);
1407         my_predecessors.add( src );
1408         if ( my_count < my_threshold && !my_successors.empty() ) 
1409             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
1410                            internal::forward_task< limiter_node<T> >( *this ) );
1411         return true;
1412     }
1413         
1414     //! Removes src from the list of cached predecessors.
1415     /* override */ bool remove_predecessor( predecessor_type &src ) {
1416         my_predecessors.remove( src );
1417         return true;
1418     }
1419         
1420 };
1421
1422 #include "internal/_flow_graph_join_impl.h"
1423
1424 using internal::reserving_port;
1425 using internal::queueing_port;
1426 using internal::tag_matching_port;
1427 using internal::input_port;
1428 using internal::tag_value;
1429 using internal::NO_TAG;
1430
1431 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
1432
1433 template<typename OutputTuple>
1434 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
1435 private:
1436     static const int N = std::tuple_size<OutputTuple>::value;
1437     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
1438 public:
1439     typedef OutputTuple output_type;
1440     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
1441     join_node(graph &g) : unfolded_type(g) { }
1442     join_node(const join_node &other) : unfolded_type(other) {}
1443 };
1444
1445 template<typename OutputTuple>
1446 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
1447 private:
1448     static const int N = std::tuple_size<OutputTuple>::value;
1449     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
1450 public:
1451     typedef OutputTuple output_type;
1452     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
1453     join_node(graph &g) : unfolded_type(g) { }
1454     join_node(const join_node &other) : unfolded_type(other) {}
1455 };
1456
1457 // template for tag_matching join_node
1458 template<typename OutputTuple>
1459 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
1460       tag_matching_port, OutputTuple, tag_matching> {
1461 private:
1462     static const int N = std::tuple_size<OutputTuple>::value;
1463     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
1464 public:
1465     typedef OutputTuple output_type;
1466     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
1467     template<typename B0, typename B1>
1468     join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
1469     template<typename B0, typename B1, typename B2>
1470     join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
1471     template<typename B0, typename B1, typename B2, typename B3>
1472     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
1473     template<typename B0, typename B1, typename B2, typename B3, typename B4>
1474     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
1475     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
1476     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
1477     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
1478     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
1479     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
1480     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
1481     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
1482     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
1483     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
1484     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
1485     join_node(const join_node &other) : unfolded_type(other) {}
1486 };
1487
1488 // or node
1489 #include "internal/_flow_graph_or_impl.h"
1490
1491 template<typename InputTuple>
1492 class or_node : public internal::unfolded_or_node<InputTuple> {
1493 private:
1494     static const int N = std::tuple_size<InputTuple>::value;
1495 public:
1496     typedef typename internal::or_output_type<N,InputTuple>::type output_type;
1497     typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
1498     or_node() : unfolded_type() { }
1499     // Copy constructor
1500     or_node( const or_node& /*other*/ ) : unfolded_type() { }
1501 };
1502
1503 //! Makes an edge between a single predecessor and a single successor
1504 template< typename T >
1505 inline void make_edge( sender<T> &p, receiver<T> &s ) {
1506     p.register_successor( s );
1507 }
1508         
1509 //! Makes an edge between a single predecessor and a single successor
1510 template< typename T >
1511 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
1512     p.remove_successor( s );
1513 }
1514
1515 //! Returns a copy of the body from a function or continue node
1516 template< typename Body, typename Node >
1517 Body copy_body( Node &n ) {
1518     return n.template copy_function_object<Body>();
1519 }
1520         
1521         
1522 } // interface6
1523
1524     using interface6::graph;
1525     using interface6::graph_node;
1526     using interface6::continue_msg;
1527     using interface6::sender;
1528     using interface6::receiver;
1529     using interface6::continue_receiver;
1530
1531     using interface6::source_node;
1532     using interface6::function_node;
1533     using interface6::continue_node;
1534     using interface6::overwrite_node;
1535     using interface6::write_once_node;
1536     using interface6::broadcast_node;
1537     using interface6::buffer_node;
1538     using interface6::queue_node;
1539     using interface6::sequencer_node;
1540     using interface6::priority_queue_node;
1541     using interface6::limiter_node;
1542     using namespace interface6::internal::graph_policy_namespace;
1543     using interface6::join_node;
1544     using interface6::or_node;
1545     using interface6::input_port;
1546     using interface6::copy_body; 
1547     using interface6::make_edge; 
1548     using interface6::remove_edge; 
1549     using interface6::internal::NO_TAG;
1550     using interface6::internal::tag_value;
1551
1552 } // graph
1553 } // tbb
1554
1555 #endif
1556