]> git.sesse.net Git - casparcg/blob - dependencies64/tbb/include/tbb/flow_graph.h
68626e8a03fdb6562a71330fe20e1a5a7ab10035
[casparcg] / dependencies64 / tbb / include / tbb / flow_graph.h
1 /*
2     Copyright 2005-2014 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5     you can redistribute it and/or modify it under the terms of the GNU General Public License
6     version 2  as  published  by  the  Free Software Foundation.  Threading Building Blocks is
7     distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9     See  the GNU General Public License for more details.   You should have received a copy of
10     the  GNU General Public License along with Threading Building Blocks; if not, write to the
11     Free Software Foundation, Inc.,  51 Franklin St,  Fifth Floor,  Boston,  MA 02110-1301 USA
12
13     As a special exception,  you may use this file  as part of a free software library without
14     restriction.  Specifically,  if other files instantiate templates  or use macros or inline
15     functions from this file, or you compile this file and link it with other files to produce
16     an executable,  this file does not by itself cause the resulting executable to be covered
17     by the GNU General Public License. This exception does not however invalidate any other
18     reasons why the executable file might be covered by the GNU General Public License.
19 */
20
21 #ifndef __TBB_flow_graph_H
22 #define __TBB_flow_graph_H
23
24 #include "tbb_stddef.h"
25 #include "atomic.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "task.h"
31 #include "cache_aligned_allocator.h"
32 #include "tbb_exception.h"
33 #include "internal/_aggregator_impl.h"
34 #include "tbb_profiling.h"
35
36 #if TBB_DEPRECATED_FLOW_ENQUEUE
37 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
38 #else
39 #define FLOW_SPAWN(a) tbb::task::spawn((a))
40 #endif
41
42 // use the VC10 or gcc version of tuple if it is available.
43 #if __TBB_CPP11_TUPLE_PRESENT
44     #include <tuple>
45 namespace tbb {
46     namespace flow {
47         using std::tuple;
48         using std::tuple_size;
49         using std::tuple_element;
50         using std::get;
51     }
52 }
53 #else
54     #include "compat/tuple"
55 #endif
56
57 #include<list>
58 #include<queue>
59
60 /** @file
61   \brief The graph related classes and functions
62
63   There are some applications that best express dependencies as messages
64   passed between nodes in a graph.  These messages may contain data or
65   simply act as signals that a predecessors has completed. The graph
66   class and its associated node classes can be used to express such
67   applications.
68 */
69
70 namespace tbb {
71 namespace flow {
72
73 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
74 enum concurrency { unlimited = 0, serial = 1 };
75
76 namespace interface7 {
77
78 namespace internal {
79     template<typename T, typename M> class successor_cache;
80     template<typename T, typename M> class broadcast_cache;
81     template<typename T, typename M> class round_robin_cache;
82 }
83
84 //! An empty class used for messages that mean "I'm done"
85 class continue_msg {};
86
87 template< typename T > class sender;
88 template< typename T > class receiver;
89 class continue_receiver;
90
91 //! Pure virtual template class that defines a sender of messages of type T
92 template< typename T >
93 class sender {
94 public:
95     //! The output type of this sender
96     typedef T output_type;
97
98     //! The successor type for this node
99     typedef receiver<T> successor_type;
100
101     virtual ~sender() {}
102
103     //! Add a new successor to this node
104     virtual bool register_successor( successor_type &r ) = 0;
105
106     //! Removes a successor from this node
107     virtual bool remove_successor( successor_type &r ) = 0;
108
109     //! Request an item from the sender
110     virtual bool try_get( T & ) { return false; }
111
112     //! Reserves an item in the sender
113     virtual bool try_reserve( T & ) { return false; }
114
115     //! Releases the reserved item
116     virtual bool try_release( ) { return false; }
117
118     //! Consumes the reserved item
119     virtual bool try_consume( ) { return false; }
120
121 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
122     //! interface to record edges for traversal & deletion
123     virtual void    internal_add_built_successor( successor_type & )    = 0;
124     virtual void    internal_delete_built_successor( successor_type & ) = 0;
125     virtual void    copy_successors( std::vector<successor_type *> &)   = 0;
126     virtual size_t  successor_count()                                   = 0;
127 #endif
128 };
129
130 template< typename T > class limiter_node;  // needed for resetting decrementer
131 template< typename R, typename B > class run_and_put_task;
132
133 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
134
135 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
136 // flags to modify the behavior of the graph reset().  Can be combined.
137 enum reset_flags {
138     rf_reset_protocol   = 0,
139     rf_reset_bodies     = 1<<0,  // delete the current node body, reset to a copy of the initial node body.
140     rf_extract          = 1<<1   // delete edges (extract() for single node, reset() for graph.)
141 };
142
143 #define __TBB_PFG_RESET_ARG(exp) exp
144 #define __TBB_COMMA ,
145 #else
146 #define __TBB_PFG_RESET_ARG(exp)  /* nothing */
147 #define __TBB_COMMA /* nothing */
148 #endif
149
150 // enqueue left task if necessary.  Returns the non-enqueued task if there is one.
151 static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right) {
152     // if no RHS task, don't change left.
153     if(right == NULL) return left;
154     // right != NULL
155     if(left == NULL) return right;
156     if(left == SUCCESSFULLY_ENQUEUED) return right;
157     // left contains a task
158     if(right != SUCCESSFULLY_ENQUEUED) {
159         // both are valid tasks
160         FLOW_SPAWN(*left);
161         return right;
162     }
163     return left;
164 }
165
166 //! Pure virtual template class that defines a receiver of messages of type T
167 template< typename T >
168 class receiver {
169 public:
170     //! The input type of this receiver
171     typedef T input_type;
172
173     //! The predecessor type for this node
174     typedef sender<T> predecessor_type;
175
176     //! Destructor
177     virtual ~receiver() {}
178
179     //! Put an item to the receiver
180     bool try_put( const T& t ) {
181         task *res = try_put_task(t);
182         if(!res) return false;
183         if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
184         return true;
185     }
186
187     //! put item to successor; return task to run the successor if possible.
188 protected:
189     template< typename R, typename B > friend class run_and_put_task;
190     template<typename X, typename Y> friend class internal::broadcast_cache;
191     template<typename X, typename Y> friend class internal::round_robin_cache;
192     virtual task *try_put_task(const T& t) = 0;
193 public:
194
195     //! Add a predecessor to the node
196     virtual bool register_predecessor( predecessor_type & ) { return false; }
197
198     //! Remove a predecessor from the node
199     virtual bool remove_predecessor( predecessor_type & ) { return false; }
200
201 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
202     virtual void   internal_add_built_predecessor( predecessor_type & )    = 0;
203     virtual void   internal_delete_built_predecessor( predecessor_type & ) = 0;
204     virtual void   copy_predecessors( std::vector<predecessor_type *> & )  = 0;
205     virtual size_t predecessor_count()                                     = 0;
206 #endif
207
208 protected:
209     //! put receiver back in initial state
210     template<typename U> friend class limiter_node;
211     virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol ) ) = 0;
212
213     template<typename TT, typename M>
214         friend class internal::successor_cache;
215     virtual bool is_continue_receiver() { return false; }
216 };
217
218 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
219 //* holder of edges both for caches and for those nodes which do not have predecessor caches.
220 // C == receiver< ... > or sender< ... >, depending.
221 template<typename C>
222 class edge_container {
223
224 public:
225     typedef std::vector<C *> edge_vector;
226
227     void add_edge( C &s) {
228         built_edges.push_back( &s );
229     }
230
231     void delete_edge( C &s) {
232         for ( typename edge_vector::iterator i = built_edges.begin(); i != built_edges.end(); ++i ) {
233             if ( *i == &s )  {
234                 (void)built_edges.erase(i);
235                 return;  // only remove one predecessor per request
236             }
237         }
238     }
239
240     void copy_edges( edge_vector &v) {
241         v = built_edges;
242     }
243
244     size_t edge_count() {
245         return (size_t)(built_edges.size());
246     }
247
248     void clear() {
249         built_edges.clear();
250     }
251
252     template< typename S > void sender_extract( S &s ); 
253     template< typename R > void receiver_extract( R &r ); 
254     
255 private: 
256     edge_vector built_edges;
257 };
258 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
259
260 //! Base class for receivers of completion messages
261 /** These receivers automatically reset, but cannot be explicitly waited on */
262 class continue_receiver : public receiver< continue_msg > {
263 public:
264
265     //! The input type
266     typedef continue_msg input_type;
267
268     //! The predecessor type for this node
269     typedef sender< continue_msg > predecessor_type;
270
271     //! Constructor
272     continue_receiver( int number_of_predecessors = 0 ) {
273         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
274         my_current_count = 0;
275     }
276
277     //! Copy constructor
278     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
279         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
280         my_current_count = 0;
281     }
282
283     //! Destructor
284     virtual ~continue_receiver() { }
285
286     //! Increments the trigger threshold
287     /* override */ bool register_predecessor( predecessor_type & ) {
288         spin_mutex::scoped_lock l(my_mutex);
289         ++my_predecessor_count;
290         return true;
291     }
292
293     //! Decrements the trigger threshold
294     /** Does not check to see if the removal of the predecessor now makes the current count
295         exceed the new threshold.  So removing a predecessor while the graph is active can cause
296         unexpected results. */
297     /* override */ bool remove_predecessor( predecessor_type & ) {
298         spin_mutex::scoped_lock l(my_mutex);
299         --my_predecessor_count;
300         return true;
301     }
302
303 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
304     typedef std::vector<predecessor_type *> predecessor_vector_type;
305
306     /*override*/ void internal_add_built_predecessor( predecessor_type &s) {
307         spin_mutex::scoped_lock l(my_mutex);
308         my_built_predecessors.add_edge( s );
309     }
310
311     /*override*/ void internal_delete_built_predecessor( predecessor_type &s) {
312         spin_mutex::scoped_lock l(my_mutex);
313         my_built_predecessors.delete_edge(s);
314     }
315
316     /*override*/ void copy_predecessors( predecessor_vector_type &v) {
317         spin_mutex::scoped_lock l(my_mutex);
318         my_built_predecessors.copy_edges(v);
319     }
320
321     /*override*/ size_t predecessor_count() {
322         spin_mutex::scoped_lock l(my_mutex);
323         return my_built_predecessors.edge_count();
324     }
325 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
326     
327 protected:
328     template< typename R, typename B > friend class run_and_put_task;
329     template<typename X, typename Y> friend class internal::broadcast_cache;
330     template<typename X, typename Y> friend class internal::round_robin_cache;
331     // execute body is supposed to be too small to create a task for.
332     /* override */ task *try_put_task( const input_type & ) {
333         {
334             spin_mutex::scoped_lock l(my_mutex);
335             if ( ++my_current_count < my_predecessor_count )
336                 return SUCCESSFULLY_ENQUEUED;
337             else
338                 my_current_count = 0;
339         }
340         task * res = execute();
341         if(!res) return SUCCESSFULLY_ENQUEUED;
342         return res;
343     }
344
345 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
346     edge_container<predecessor_type> my_built_predecessors;
347 #endif
348     spin_mutex my_mutex;
349     int my_predecessor_count;
350     int my_current_count;
351     int my_initial_predecessor_count;
352     // the friend declaration in the base class did not eliminate the "protected class"
353     // error in gcc 4.1.2
354     template<typename U> friend class limiter_node;
355     /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) )
356     {
357         my_current_count = 0;
358 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
359         if(f & rf_extract) {
360             my_built_predecessors.receiver_extract(*this);
361             my_predecessor_count = my_initial_predecessor_count;
362         }
363 #endif
364     }
365
366     //! Does whatever should happen when the threshold is reached
367     /** This should be very fast or else spawn a task.  This is
368         called while the sender is blocked in the try_put(). */
369     virtual task * execute() = 0;
370     template<typename TT, typename M>
371         friend class internal::successor_cache;
372     /*override*/ bool is_continue_receiver() { return true; }
373 };
374 }  // interface7
375 }  // flow
376 }  // tbb
377
378 #include "internal/_flow_graph_trace_impl.h"
379
380 namespace tbb {
381 namespace flow {
382 namespace interface7 {
383
384 #include "internal/_flow_graph_types_impl.h"
385 #include "internal/_flow_graph_impl.h"
386 using namespace internal::graph_policy_namespace;
387
388 class graph;
389 class graph_node;
390
391 template <typename GraphContainerType, typename GraphNodeType>
392 class graph_iterator {
393     friend class graph;
394     friend class graph_node;
395 public:
396     typedef size_t size_type;
397     typedef GraphNodeType value_type;
398     typedef GraphNodeType* pointer;
399     typedef GraphNodeType& reference;
400     typedef const GraphNodeType& const_reference;
401     typedef std::forward_iterator_tag iterator_category;
402
403     //! Default constructor
404     graph_iterator() : my_graph(NULL), current_node(NULL) {}
405
406     //! Copy constructor
407     graph_iterator(const graph_iterator& other) :
408         my_graph(other.my_graph), current_node(other.current_node)
409     {}
410
411     //! Assignment
412     graph_iterator& operator=(const graph_iterator& other) {
413         if (this != &other) {
414             my_graph = other.my_graph;
415             current_node = other.current_node;
416         }
417         return *this;
418     }
419
420     //! Dereference
421     reference operator*() const;
422
423     //! Dereference
424     pointer operator->() const;
425
426     //! Equality
427     bool operator==(const graph_iterator& other) const {
428         return ((my_graph == other.my_graph) && (current_node == other.current_node));
429     }
430
431     //! Inequality
432     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
433
434     //! Pre-increment
435     graph_iterator& operator++() {
436         internal_forward();
437         return *this;
438     }
439
440     //! Post-increment
441     graph_iterator operator++(int) {
442         graph_iterator result = *this;
443         operator++();
444         return result;
445     }
446
447 private:
448     // the graph over which we are iterating
449     GraphContainerType *my_graph;
450     // pointer into my_graph's my_nodes list
451     pointer current_node;
452
453     //! Private initializing constructor for begin() and end() iterators
454     graph_iterator(GraphContainerType *g, bool begin);
455     void internal_forward();
456 };
457
458 //! The graph class
459 /** This class serves as a handle to the graph */
460 class graph : tbb::internal::no_copy {
461     friend class graph_node;
462
463     template< typename Body >
464     class run_task : public task {
465     public:
466         run_task( Body& body ) : my_body(body) {}
467         task *execute() {
468             my_body();
469             return NULL;
470         }
471     private:
472         Body my_body;
473     };
474
475     template< typename Receiver, typename Body >
476     class run_and_put_task : public task {
477     public:
478         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
479         task *execute() {
480             task *res = my_receiver.try_put_task( my_body() );
481             if(res == SUCCESSFULLY_ENQUEUED) res = NULL;
482             return res;
483         }
484     private:
485         Receiver &my_receiver;
486         Body my_body;
487     };
488
489 public:
490     //! Constructs a graph with isolated task_group_context
491     explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
492     {
493         own_context = true;
494         cancelled = false;
495         caught_exception = false;
496         my_context = new task_group_context();
497         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
498         my_root_task->set_ref_count(1);
499         tbb::internal::fgt_graph( this );
500 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
501         my_is_active = true;
502 #endif
503     }
504
505     //! Constructs a graph with use_this_context as context
506     explicit graph(task_group_context& use_this_context) :
507     my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
508     {
509         own_context = false;
510         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
511         my_root_task->set_ref_count(1);
512         tbb::internal::fgt_graph( this );
513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
514         my_is_active = true;
515 #endif
516     }
517
518     //! Destroys the graph.
519     /** Calls wait_for_all, then destroys the root task and context. */
520     ~graph() {
521         wait_for_all();
522         my_root_task->set_ref_count(0);
523         task::destroy( *my_root_task );
524         if (own_context) delete my_context;
525     }
526
527 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
528     void set_name( const char *name ) {
529         tbb::internal::fgt_graph_desc( this, name );
530     }
531 #endif
532
533     //! Used to register that an external entity may still interact with the graph.
534     /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
535         is made. */
536     void increment_wait_count() {
537         if (my_root_task)
538             my_root_task->increment_ref_count();
539     }
540
541     //! Deregisters an external entity that may have interacted with the graph.
542     /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
543         matches the number of increment_wait_count calls. */
544     void decrement_wait_count() {
545         if (my_root_task)
546             my_root_task->decrement_ref_count();
547     }
548
549     //! Spawns a task that runs a body and puts its output to a specific receiver
550     /** The task is spawned as a child of the graph. This is useful for running tasks
551         that need to block a wait_for_all() on the graph.  For example a one-off source. */
552     template< typename Receiver, typename Body >
553         void run( Receiver &r, Body body ) {
554        FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_root_task ) )
555                    run_and_put_task< Receiver, Body >( r, body )) );
556     }
557
558     //! Spawns a task that runs a function object
559     /** The task is spawned as a child of the graph. This is useful for running tasks
560         that need to block a wait_for_all() on the graph. For example a one-off source. */
561     template< typename Body >
562     void run( Body body ) {
563        FLOW_SPAWN( * new ( task::allocate_additional_child_of( *my_root_task ) ) run_task< Body >( body ) );
564     }
565
566     //! Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
567     /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
568     void wait_for_all() {
569         cancelled = false;
570         caught_exception = false;
571         if (my_root_task) {
572 #if TBB_USE_EXCEPTIONS
573             try {
574 #endif
575                 my_root_task->wait_for_all();
576                 cancelled = my_context->is_group_execution_cancelled();
577 #if TBB_USE_EXCEPTIONS
578             }
579             catch(...) {
580                 my_root_task->set_ref_count(1);
581                 my_context->reset();
582                 caught_exception = true;
583                 cancelled = true;
584                 throw;
585             }
586 #endif
587             my_context->reset();  // consistent with behavior in catch()
588             my_root_task->set_ref_count(1);
589         }
590     }
591
592     //! Returns the root task of the graph
593     task * root_task() {
594 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
595         if (!my_is_active) 
596             return NULL;
597         else
598 #endif
599             return my_root_task;
600     }
601
602 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
603     void set_active(bool a = true) {
604        my_is_active = a;
605     }
606
607     bool is_active() {
608        return my_is_active;
609     }
610 #endif
611
612     // ITERATORS
613     template<typename C, typename N>
614     friend class graph_iterator;
615
616     // Graph iterator typedefs
617     typedef graph_iterator<graph,graph_node> iterator;
618     typedef graph_iterator<const graph,const graph_node> const_iterator;
619
620     // Graph iterator constructors
621     //! start iterator
622     iterator begin() { return iterator(this, true); }
623     //! end iterator
624     iterator end() { return iterator(this, false); }
625      //! start const iterator
626     const_iterator begin() const { return const_iterator(this, true); }
627     //! end const iterator
628     const_iterator end() const { return const_iterator(this, false); }
629     //! start const iterator
630     const_iterator cbegin() const { return const_iterator(this, true); }
631     //! end const iterator
632     const_iterator cend() const { return const_iterator(this, false); }
633
634     //! return status of graph execution
635     bool is_cancelled() { return cancelled; }
636     bool exception_thrown() { return caught_exception; }
637
638     // thread-unsafe state reset.
639     void reset(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol));
640
641 private:
642     task *my_root_task;
643     task_group_context *my_context;
644     bool own_context;
645     bool cancelled;
646     bool caught_exception;
647 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
648     bool my_is_active;
649 #endif
650
651
652     graph_node *my_nodes, *my_nodes_last;
653
654     spin_mutex nodelist_mutex;
655     void register_node(graph_node *n);
656     void remove_node(graph_node *n);
657
658 };  // class graph
659
660 template <typename C, typename N>
661 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
662 {
663     if (begin) current_node = my_graph->my_nodes;
664     //else it is an end iterator by default
665 }
666
667 template <typename C, typename N>
668 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
669     __TBB_ASSERT(current_node, "graph_iterator at end");
670     return *operator->();
671 }
672
673 template <typename C, typename N>
674 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
675     return current_node;
676 }
677
678
679 template <typename C, typename N>
680 void graph_iterator<C,N>::internal_forward() {
681     if (current_node) current_node = current_node->next;
682 }
683
684 //! The base of all graph nodes.
685 class graph_node : tbb::internal::no_assign {
686     friend class graph;
687     template<typename C, typename N>
688     friend class graph_iterator;
689 protected:
690     graph& my_graph;
691     graph_node *next, *prev;
692 public:
693     graph_node(graph& g) : my_graph(g) {
694         my_graph.register_node(this);
695     }
696     virtual ~graph_node() {
697         my_graph.remove_node(this);
698     }
699
700 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
701     virtual void set_name( const char *name ) = 0;
702 #endif
703
704 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
705     virtual void extract( reset_flags f=rf_extract ) {
706         bool a = my_graph.is_active();
707         my_graph.set_active(false);
708         reset((reset_flags)(f|rf_extract));
709         my_graph.set_active(a);
710     }
711 #endif
712
713 protected:
714     virtual void reset(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_protocol)) = 0;
715 };
716
717 inline void graph::register_node(graph_node *n) {
718     n->next = NULL;
719     {
720         spin_mutex::scoped_lock lock(nodelist_mutex);
721         n->prev = my_nodes_last;
722         if (my_nodes_last) my_nodes_last->next = n;
723         my_nodes_last = n;
724         if (!my_nodes) my_nodes = n;
725     }
726 }
727
728 inline void graph::remove_node(graph_node *n) {
729     {
730         spin_mutex::scoped_lock lock(nodelist_mutex);
731         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
732         if (n->prev) n->prev->next = n->next;
733         if (n->next) n->next->prev = n->prev;
734         if (my_nodes_last == n) my_nodes_last = n->prev;
735         if (my_nodes == n) my_nodes = n->next;
736     }
737     n->prev = n->next = NULL;
738 }
739
740 inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) {
741     // reset context
742     task *saved_my_root_task = my_root_task;
743     my_root_task = NULL;
744     if(my_context) my_context->reset();
745     cancelled = false;
746     caught_exception = false;
747     // reset all the nodes comprising the graph
748     for(iterator ii = begin(); ii != end(); ++ii) {
749         graph_node *my_p = &(*ii);
750         my_p->reset(__TBB_PFG_RESET_ARG(f));
751     }
752     my_root_task = saved_my_root_task;
753 }
754
755
756 #include "internal/_flow_graph_node_impl.h"
757
758 //! An executable node that acts as a source, i.e. it has no predecessors
759 template < typename Output >
760 class source_node : public graph_node, public sender< Output > {
761 protected:
762     using graph_node::my_graph;
763 public:
764     //! The type of the output message, which is complete
765     typedef Output output_type;
766
767     //! The type of successors of this node
768     typedef receiver< Output > successor_type;
769
770 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
771     typedef std::vector<successor_type *> successor_vector_type;
772 #endif
773
774     //! Constructor for a node with a successor
775     template< typename Body >
776     source_node( graph &g, Body body, bool is_active = true )
777         : graph_node(g), my_active(is_active), init_my_active(is_active),
778         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
779         my_reserved(false), my_has_cached_item(false)
780     {
781         my_successors.set_owner(this);
782         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
783                                            static_cast<sender<output_type> *>(this), this->my_body );
784     }
785
786     //! Copy constructor
787     source_node( const source_node& src ) :
788         graph_node(src.my_graph), sender<Output>(),
789         my_active(src.init_my_active),
790         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
791         my_reserved(false), my_has_cached_item(false)
792     {
793         my_successors.set_owner(this);
794         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
795                                            static_cast<sender<output_type> *>(this), this->my_body );
796     }
797
798     //! The destructor
799     ~source_node() { delete my_body; }
800
801 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
802     /* override */ void set_name( const char *name ) {
803         tbb::internal::fgt_node_desc( this, name );
804     }
805 #endif
806
807     //! Add a new successor to this node
808     /* override */ bool register_successor( successor_type &r ) {
809         spin_mutex::scoped_lock lock(my_mutex);
810         my_successors.register_successor(r);
811         if ( my_active )
812             spawn_put();
813         return true;
814     }
815
816     //! Removes a successor from this node
817     /* override */ bool remove_successor( successor_type &r ) {
818         spin_mutex::scoped_lock lock(my_mutex);
819         my_successors.remove_successor(r);
820         return true;
821     }
822
823 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
824     /*override*/void internal_add_built_successor( successor_type &r) {
825         spin_mutex::scoped_lock lock(my_mutex);
826         my_successors.internal_add_built_successor(r);
827     }
828
829     /*override*/void internal_delete_built_successor( successor_type &r) {
830         spin_mutex::scoped_lock lock(my_mutex);
831         my_successors.internal_delete_built_successor(r);
832     }
833
834     /*override*/size_t successor_count() {
835         spin_mutex::scoped_lock lock(my_mutex);
836         return my_successors.successor_count();
837     }
838
839     /*override*/void copy_successors(successor_vector_type &v) {
840         spin_mutex::scoped_lock l(my_mutex);
841         my_successors.copy_successors(v);
842     }
843 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
844
845     //! Request an item from the node
846     /*override */ bool try_get( output_type &v ) {
847         spin_mutex::scoped_lock lock(my_mutex);
848         if ( my_reserved )
849             return false;
850
851         if ( my_has_cached_item ) {
852             v = my_cached_item;
853             my_has_cached_item = false;
854             return true;
855         }
856         // we've been asked to provide an item, but we have none.  enqueue a task to
857         // provide one.
858         spawn_put();
859         return false;
860     }
861
862     //! Reserves an item.
863     /* override */ bool try_reserve( output_type &v ) {
864         spin_mutex::scoped_lock lock(my_mutex);
865         if ( my_reserved ) {
866             return false;
867         }
868
869         if ( my_has_cached_item ) {
870             v = my_cached_item;
871             my_reserved = true;
872             return true;
873         } else {
874             return false;
875         }
876     }
877
878     //! Release a reserved item.
879     /** true = item has been released and so remains in sender, dest must request or reserve future items */
880     /* override */ bool try_release( ) {
881         spin_mutex::scoped_lock lock(my_mutex);
882         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
883         my_reserved = false;
884         if(!my_successors.empty())
885             spawn_put();
886         return true;
887     }
888
889     //! Consumes a reserved item
890     /* override */ bool try_consume( ) {
891         spin_mutex::scoped_lock lock(my_mutex);
892         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
893         my_reserved = false;
894         my_has_cached_item = false;
895         if ( !my_successors.empty() ) {
896             spawn_put();
897         }
898         return true;
899     }
900
901     //! Activates a node that was created in the inactive state
902     void activate() {
903         spin_mutex::scoped_lock lock(my_mutex);
904         my_active = true;
905         if ( !my_successors.empty() )
906             spawn_put();
907     }
908
909     template<typename Body>
910     Body copy_function_object() {
911         internal::source_body<output_type> &body_ref = *this->my_body;
912         return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
913     }
914
915 protected:
916
917     //! resets the source_node to its initial state
918     void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
919         my_active = init_my_active;
920         my_reserved =false;
921         if(my_has_cached_item) {
922             my_has_cached_item = false;
923         }
924 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
925         my_successors.reset(f);
926         if(f & rf_reset_bodies) my_body->reset_body();
927 #endif
928     }
929
930 private:
931     spin_mutex my_mutex;
932     bool my_active;
933     bool init_my_active;
934     internal::source_body<output_type> *my_body;
935     internal::broadcast_cache< output_type > my_successors;
936     bool my_reserved;
937     bool my_has_cached_item;
938     output_type my_cached_item;
939
940     // used by apply_body, can invoke body of node.
941     bool try_reserve_apply_body(output_type &v) {
942         spin_mutex::scoped_lock lock(my_mutex);
943         if ( my_reserved ) {
944             return false;
945         }
946         if ( !my_has_cached_item ) {
947             tbb::internal::fgt_begin_body( my_body );
948             bool r = (*my_body)(my_cached_item);
949             tbb::internal::fgt_end_body( my_body );
950             if (r) {
951                 my_has_cached_item = true;
952             }
953         }
954         if ( my_has_cached_item ) {
955             v = my_cached_item;
956             my_reserved = true;
957             return true;
958         } else {
959             return false;
960         }
961     }
962
963     //! Spawns a task that applies the body
964     /* override */ void spawn_put( ) {
965         task* tp = this->my_graph.root_task();
966         if(tp) {
967             FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
968                         internal:: source_task_bypass < source_node< output_type > >( *this ) ) );
969         }
970     }
971
972     friend class internal::source_task_bypass< source_node< output_type > >;
973     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
974     /* override */ task * apply_body_bypass( ) {
975         output_type v;
976         if ( !try_reserve_apply_body(v) )
977             return NULL;
978
979         task *last_task = my_successors.try_put_task(v);
980         if ( last_task )
981             try_consume();
982         else
983             try_release();
984         return last_task;
985     }
986 };  // source_node
987
988 //! Implements a function node that supports Input -> Output
989 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
990 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
991 protected:
992     using graph_node::my_graph;
993 public:
994     typedef Input input_type;
995     typedef Output output_type;
996     typedef sender< input_type > predecessor_type;
997     typedef receiver< output_type > successor_type;
998     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
999     typedef internal::function_output<output_type> fOutput_type;
1000 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1001     using typename internal::function_input<Input,Output,Allocator>::predecessor_vector_type;
1002     using typename internal::function_output<Output>::successor_vector_type;
1003 #endif
1004
1005     //! Constructor
1006     template< typename Body >
1007     function_node( graph &g, size_t concurrency, Body body ) :
1008         graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body) {
1009         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1010                                            static_cast<sender<output_type> *>(this), this->my_body );
1011     }
1012
1013     //! Copy constructor
1014     function_node( const function_node& src ) :
1015         graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
1016         fOutput_type() {
1017         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph, static_cast<receiver<input_type> *>(this),
1018                                            static_cast<sender<output_type> *>(this), this->my_body );
1019     }
1020
1021 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1022     /* override */ void set_name( const char *name ) {
1023         tbb::internal::fgt_node_desc( this, name );
1024     }
1025 #endif
1026
1027 protected:
1028     template< typename R, typename B > friend class run_and_put_task;
1029     template<typename X, typename Y> friend class internal::broadcast_cache;
1030     template<typename X, typename Y> friend class internal::round_robin_cache;
1031     using fInput_type::try_put_task;
1032
1033     // override of graph_node's reset.
1034     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1035         fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1036 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1037         successors().reset(f);
1038         __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1039         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1040 #endif
1041     }
1042
1043     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1044 };
1045
1046 //! Implements a function node that supports Input -> Output
1047 template < typename Input, typename Output, typename Allocator >
1048 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
1049 protected:
1050     using graph_node::my_graph;
1051 public:
1052     typedef Input input_type;
1053     typedef Output output_type;
1054     typedef sender< input_type > predecessor_type;
1055     typedef receiver< output_type > successor_type;
1056     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
1057     typedef internal::function_input_queue<input_type, Allocator> queue_type;
1058     typedef internal::function_output<output_type> fOutput_type;
1059 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1060     using typename internal::function_input<Input,Output,Allocator>::predecessor_vector_type;
1061     using typename internal::function_output<Output>::successor_vector_type;
1062 #endif
1063
1064     //! Constructor
1065     template< typename Body >
1066     function_node( graph &g, size_t concurrency, Body body ) :
1067         graph_node(g), fInput_type( g, concurrency, body, new queue_type() ) {
1068         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1069                                            static_cast<sender<output_type> *>(this), this->my_body );
1070     }
1071
1072     //! Copy constructor
1073     function_node( const function_node& src ) :
1074         graph_node(src.graph_node::my_graph), fInput_type( src, new queue_type() ), fOutput_type() {
1075         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1076                                            static_cast<sender<output_type> *>(this), this->my_body );
1077     }
1078
1079 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1080     /* override */ void set_name( const char *name ) {
1081         tbb::internal::fgt_node_desc( this, name );
1082     }
1083 #endif
1084
1085 protected:
1086     template< typename R, typename B > friend class run_and_put_task;
1087     template<typename X, typename Y> friend class internal::broadcast_cache;
1088     template<typename X, typename Y> friend class internal::round_robin_cache;
1089     using fInput_type::try_put_task;
1090
1091     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1092         fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1093 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1094         successors().reset(f);
1095         __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1096         __TBB_ASSERT(!(f & rf_extract) || this->my_predecessors.empty(), "function_node predecessors not empty");
1097 #endif
1098
1099     }
1100
1101     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1102 };
1103
1104 //! implements a function node that supports Input -> (set of outputs)
1105 // Output is a tuple of output types.
1106 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1107 class multifunction_node :
1108     public graph_node,
1109     public internal::multifunction_input
1110     <
1111         Input,
1112         typename internal::wrap_tuple_elements<
1113             tbb::flow::tuple_size<Output>::value,  // #elements in tuple
1114             internal::multifunction_output,  // wrap this around each element
1115             Output // the tuple providing the types
1116         >::type,
1117         Allocator
1118     > {
1119 protected:
1120     using graph_node::my_graph;
1121 private:
1122     static const int N = tbb::flow::tuple_size<Output>::value;
1123 public:
1124     typedef Input input_type;
1125     typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1126 private:
1127     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1128     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1129 public:
1130     template<typename Body>
1131     multifunction_node( graph &g, size_t concurrency, Body body ) :
1132         graph_node(g), base_type(g,concurrency, body) {
1133         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1134                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1135                                                                  this->output_ports(), this->my_body );
1136     }
1137
1138     multifunction_node( const multifunction_node &other) :
1139         graph_node(other.graph_node::my_graph), base_type(other) {
1140         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1141                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1142                                                                  this->output_ports(), this->my_body );
1143     }
1144
1145 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1146     /* override */ void set_name( const char *name ) {
1147         tbb::internal::fgt_multioutput_node_desc( this, name );
1148     }
1149 #endif
1150
1151     // all the guts are in multifunction_input...
1152 protected:
1153     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1154 };  // multifunction_node
1155
1156 template < typename Input, typename Output, typename Allocator >
1157 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
1158     typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::value, internal::multifunction_output, Output>::type, Allocator> {
1159 protected:
1160     using graph_node::my_graph;
1161     static const int N = tbb::flow::tuple_size<Output>::value;
1162 public:
1163     typedef Input input_type;
1164     typedef typename internal::wrap_tuple_elements<N, internal::multifunction_output, Output>::type output_ports_type;
1165 private:
1166     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1167     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1168 public:
1169     template<typename Body>
1170     multifunction_node( graph &g, size_t concurrency, Body body) :
1171         graph_node(g), base_type(g,concurrency, body, new queue_type()) {
1172         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1173                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1174                                                                  this->output_ports(), this->my_body );
1175     }
1176
1177     multifunction_node( const multifunction_node &other) :
1178         graph_node(other.graph_node::my_graph), base_type(other, new queue_type()) {
1179         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1180                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1181                                                                  this->output_ports(), this->my_body );
1182     }
1183
1184 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1185     /* override */ void set_name( const char *name ) {
1186         tbb::internal::fgt_multioutput_node_desc( this, name );
1187     }
1188 #endif
1189
1190     // all the guts are in multifunction_input...
1191 protected:
1192     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1193 };  // multifunction_node
1194
1195 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1196 //  successors.  The node has unlimited concurrency, so though it is marked as
1197 //  "rejecting" it does not reject inputs.
1198 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1199 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
1200     static const int N = tbb::flow::tuple_size<TupleType>::value;
1201     typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
1202 public:
1203     typedef typename base_type::output_ports_type output_ports_type;
1204 private:
1205     struct splitting_body {
1206         void operator()(const TupleType& t, output_ports_type &p) {
1207             internal::emit_element<N>::emit_this(t, p);
1208         }
1209     };
1210 public:
1211     typedef TupleType input_type;
1212     typedef Allocator allocator_type;
1213     split_node(graph &g) : base_type(g, unlimited, splitting_body()) {
1214         tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1215                                                           static_cast<receiver<input_type> *>(this), this->output_ports() );
1216     }
1217
1218     split_node( const split_node & other) : base_type(other) {
1219         tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1220                                                           static_cast<receiver<input_type> *>(this), this->output_ports() );
1221     }
1222
1223 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1224     /* override */ void set_name( const char *name ) {
1225         tbb::internal::fgt_multioutput_node_desc( this, name );
1226     }
1227 #endif
1228
1229 };
1230
1231 //! Implements an executable node that supports continue_msg -> Output
1232 template <typename Output>
1233 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
1234 protected:
1235     using graph_node::my_graph;
1236 public:
1237     typedef continue_msg input_type;
1238     typedef Output output_type;
1239     typedef sender< input_type > predecessor_type;
1240     typedef receiver< output_type > successor_type;
1241     typedef internal::continue_input<Output> fInput_type;
1242     typedef internal::function_output<output_type> fOutput_type;
1243
1244     //! Constructor for executable node with continue_msg -> Output
1245     template <typename Body >
1246     continue_node( graph &g, Body body ) :
1247         graph_node(g), internal::continue_input<output_type>( g, body ) {
1248         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1249                                            static_cast<receiver<input_type> *>(this),
1250                                            static_cast<sender<output_type> *>(this), this->my_body );
1251     }
1252
1253
1254     //! Constructor for executable node with continue_msg -> Output
1255     template <typename Body >
1256     continue_node( graph &g, int number_of_predecessors, Body body ) :
1257         graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1258         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1259                                            static_cast<receiver<input_type> *>(this),
1260                                            static_cast<sender<output_type> *>(this), this->my_body );
1261     }
1262
1263     //! Copy constructor
1264     continue_node( const continue_node& src ) :
1265         graph_node(src.graph_node::my_graph), internal::continue_input<output_type>(src),
1266         internal::function_output<Output>() {
1267         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1268                                            static_cast<receiver<input_type> *>(this),
1269                                            static_cast<sender<output_type> *>(this), this->my_body );
1270     }
1271
1272 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1273     /* override */ void set_name( const char *name ) {
1274         tbb::internal::fgt_node_desc( this, name );
1275     }
1276 #endif
1277
1278 protected:
1279     template< typename R, typename B > friend class run_and_put_task;
1280     template<typename X, typename Y> friend class internal::broadcast_cache;
1281     template<typename X, typename Y> friend class internal::round_robin_cache;
1282     using fInput_type::try_put_task;
1283
1284     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1285         fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f));
1286 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1287         successors().reset(f);
1288         __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "continue_node not reset");
1289 #endif
1290     }
1291
1292     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1293 };  // continue_node
1294
1295 template< typename T >
1296 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
1297 protected:
1298     using graph_node::my_graph;
1299 public:
1300     typedef T input_type;
1301     typedef T output_type;
1302     typedef sender< input_type > predecessor_type;
1303     typedef receiver< output_type > successor_type;
1304 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1305     typedef std::vector<predecessor_type *> predecessor_vector_type;
1306     typedef std::vector<successor_type *> successor_vector_type;
1307 #endif
1308
1309     overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
1310         my_successors.set_owner( this );
1311         tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1312                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1313     }
1314
1315     // Copy constructor; doesn't take anything from src; default won't work
1316     overwrite_node( const overwrite_node& src ) :
1317         graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
1318     {
1319         my_successors.set_owner( this );
1320         tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1321                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1322     }
1323
1324     ~overwrite_node() {}
1325
1326 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1327     /* override */ void set_name( const char *name ) {
1328         tbb::internal::fgt_node_desc( this, name );
1329     }
1330 #endif
1331
1332     /* override */ bool register_successor( successor_type &s ) {
1333         spin_mutex::scoped_lock l( my_mutex );
1334         task* tp = this->my_graph.root_task();  // just to test if we are resetting
1335         if (my_buffer_is_valid && tp) {
1336             // We have a valid value that must be forwarded immediately.
1337             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
1338                 // We add the successor: it accepted our put or it rejected it but won't let us become a predecessor
1339                 my_successors.register_successor( s );
1340             } else {
1341                 // We don't add the successor: it rejected our put and we became its predecessor instead
1342                 return false;
1343             }
1344         } else {
1345             // No valid value yet, just add as successor
1346             my_successors.register_successor( s );
1347         }
1348         return true;
1349     }
1350
1351     /* override */ bool remove_successor( successor_type &s ) {
1352         spin_mutex::scoped_lock l( my_mutex );
1353         my_successors.remove_successor(s);
1354         return true;
1355     }
1356
1357 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1358     /*override*/void internal_add_built_successor( successor_type &s) {
1359         spin_mutex::scoped_lock l( my_mutex );
1360         my_successors.internal_add_built_successor(s);
1361     }
1362
1363     /*override*/void internal_delete_built_successor( successor_type &s) {
1364         spin_mutex::scoped_lock l( my_mutex );
1365         my_successors.internal_delete_built_successor(s);
1366     }
1367
1368     /*override*/size_t successor_count() {
1369         spin_mutex::scoped_lock l( my_mutex );
1370         return my_successors.successor_count();
1371     }
1372
1373     /*override*/ void copy_successors(successor_vector_type &v) {
1374         spin_mutex::scoped_lock l( my_mutex );
1375         my_successors.copy_successors(v);
1376     }
1377
1378     /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1379         spin_mutex::scoped_lock l( my_mutex );
1380         my_built_predecessors.add_edge(p);
1381     }
1382
1383     /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1384         spin_mutex::scoped_lock l( my_mutex );
1385         my_built_predecessors.delete_edge(p);
1386     }
1387
1388     /*override*/size_t predecessor_count() {
1389         spin_mutex::scoped_lock l( my_mutex );
1390         return my_built_predecessors.edge_count();
1391     }
1392
1393     /*override*/void copy_predecessors(predecessor_vector_type &v) {
1394         spin_mutex::scoped_lock l( my_mutex );
1395         my_built_predecessors.copy_edges(v);
1396     }
1397 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1398
1399     /* override */ bool try_get( input_type &v ) {
1400         spin_mutex::scoped_lock l( my_mutex );
1401         if ( my_buffer_is_valid ) {
1402             v = my_buffer;
1403             return true;
1404         }
1405         return false;
1406     }
1407
1408     bool is_valid() {
1409        spin_mutex::scoped_lock l( my_mutex );
1410        return my_buffer_is_valid;
1411     }
1412
1413     void clear() {
1414        spin_mutex::scoped_lock l( my_mutex );
1415        my_buffer_is_valid = false;
1416     }
1417
1418 protected:
1419     template< typename R, typename B > friend class run_and_put_task;
1420     template<typename X, typename Y> friend class internal::broadcast_cache;
1421     template<typename X, typename Y> friend class internal::round_robin_cache;
1422     /* override */ task * try_put_task( const input_type &v ) {
1423         spin_mutex::scoped_lock l( my_mutex );
1424         my_buffer = v;
1425         my_buffer_is_valid = true;
1426         task * rtask = my_successors.try_put_task(v);
1427         if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
1428         return rtask;
1429     }
1430
1431     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1432         my_buffer_is_valid = false;
1433 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1434         my_successors.reset(f);
1435        if (f&rf_extract) {
1436            my_built_predecessors.receiver_extract(*this);
1437        }
1438 #endif
1439     }
1440
1441     spin_mutex my_mutex;
1442     internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
1443 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1444     edge_container<sender<input_type> > my_built_predecessors;
1445 #endif
1446     input_type my_buffer;
1447     bool my_buffer_is_valid;
1448     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1449 };  // overwrite_node
1450
1451 template< typename T >
1452 class write_once_node : public overwrite_node<T> {
1453 public:
1454     typedef T input_type;
1455     typedef T output_type;
1456     typedef sender< input_type > predecessor_type;
1457     typedef receiver< output_type > successor_type;
1458
1459     //! Constructor
1460     write_once_node(graph& g) : overwrite_node<T>(g) {
1461         tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1462                                  static_cast<receiver<input_type> *>(this),
1463                                  static_cast<sender<output_type> *>(this) );
1464     }
1465
1466     //! Copy constructor: call base class copy constructor
1467     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {
1468         tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1469                                  static_cast<receiver<input_type> *>(this),
1470                                  static_cast<sender<output_type> *>(this) );
1471     }
1472
1473 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1474     /* override */ void set_name( const char *name ) {
1475         tbb::internal::fgt_node_desc( this, name );
1476     }
1477 #endif
1478
1479 protected:
1480     template< typename R, typename B > friend class run_and_put_task;
1481     template<typename X, typename Y> friend class internal::broadcast_cache;
1482     template<typename X, typename Y> friend class internal::round_robin_cache;
1483     /* override */ task *try_put_task( const T &v ) {
1484         spin_mutex::scoped_lock l( this->my_mutex );
1485         if ( this->my_buffer_is_valid ) {
1486             return NULL;
1487         } else {
1488             this->my_buffer = v;
1489             this->my_buffer_is_valid = true;
1490             task *res = this->my_successors.try_put_task(v);
1491             if(!res) res = SUCCESSFULLY_ENQUEUED;
1492             return res;
1493         }
1494     }
1495 };
1496
1497 //! Forwards messages of type T to all successors
1498 template <typename T>
1499 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1500 protected:
1501     using graph_node::my_graph;
1502 public:
1503     typedef T input_type;
1504     typedef T output_type;
1505     typedef sender< input_type > predecessor_type;
1506     typedef receiver< output_type > successor_type;
1507 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1508     typedef std::vector<predecessor_type *> predecessor_vector_type;
1509     typedef std::vector<successor_type *> successor_vector_type;
1510 #endif
1511 private:
1512     internal::broadcast_cache<input_type> my_successors;
1513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1514     edge_container<predecessor_type> my_built_predecessors;
1515     spin_mutex pred_mutex;
1516 #endif
1517 public:
1518
1519     broadcast_node(graph& g) : graph_node(g) {
1520         my_successors.set_owner( this );
1521         tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1522                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1523     }
1524
1525     // Copy constructor
1526     broadcast_node( const broadcast_node& src ) :
1527         graph_node(src.my_graph), receiver<T>(), sender<T>()
1528     {
1529         my_successors.set_owner( this );
1530         tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1531                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1532     }
1533
1534 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1535     /* override */ void set_name( const char *name ) {
1536         tbb::internal::fgt_node_desc( this, name );
1537     }
1538 #endif
1539
1540     //! Adds a successor
1541     virtual bool register_successor( receiver<T> &r ) {
1542         my_successors.register_successor( r );
1543         return true;
1544     }
1545
1546     //! Removes s as a successor
1547     virtual bool remove_successor( receiver<T> &r ) {
1548         my_successors.remove_successor( r );
1549         return true;
1550     }
1551
1552 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1553     /*override*/ void internal_add_built_successor(successor_type &r) {
1554         my_successors.internal_add_built_successor(r);
1555     }
1556
1557     /*override*/ void internal_delete_built_successor(successor_type &r) {
1558         my_successors.internal_delete_built_successor(r);
1559     }
1560
1561     /*override*/ size_t successor_count() {
1562         return my_successors.successor_count();
1563     }
1564
1565     /*override*/ void copy_successors(successor_vector_type &v) {
1566         my_successors.copy_successors(v);
1567     }
1568
1569     /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1570         my_built_predecessors.add_edge(p);
1571     }
1572
1573     /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1574         my_built_predecessors.delete_edge(p);
1575     }
1576
1577     /*override*/ size_t predecessor_count() {
1578         return my_built_predecessors.edge_count();
1579     }
1580
1581     /*override*/ void copy_predecessors(predecessor_vector_type &v) {
1582         my_built_predecessors.copy_edges(v);
1583     }
1584 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1585
1586 protected:
1587     template< typename R, typename B > friend class run_and_put_task;
1588     template<typename X, typename Y> friend class internal::broadcast_cache;
1589     template<typename X, typename Y> friend class internal::round_robin_cache;
1590     //! build a task to run the successor if possible.  Default is old behavior.
1591     /*override*/ task *try_put_task(const T& t) {
1592         task *new_task = my_successors.try_put_task(t);
1593         if(!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1594         return new_task;
1595     }
1596
1597     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1598 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1599         my_successors.reset(f);
1600         if (f&rf_extract) {
1601            my_built_predecessors.receiver_extract(*this);
1602         }
1603         __TBB_ASSERT(!(f & rf_extract) || my_successors.empty(), "Error resetting broadcast_node");
1604 #endif
1605     }
1606     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1607 };  // broadcast_node
1608
1609 //! Forwards messages in arbitrary order
1610 template <typename T, typename A=cache_aligned_allocator<T> >
1611 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1612 protected:
1613     using graph_node::my_graph;
1614 public:
1615     typedef T input_type;
1616     typedef T output_type;
1617     typedef sender< input_type > predecessor_type;
1618     typedef receiver< output_type > successor_type;
1619     typedef buffer_node<T, A> my_class;
1620 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1621     typedef std::vector<predecessor_type *> predecessor_vector_type;
1622     typedef std::vector<successor_type *> successor_vector_type;
1623 #endif
1624 protected:
1625     typedef size_t size_type;
1626     internal::round_robin_cache< T, null_rw_mutex > my_successors;
1627
1628 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1629     edge_container<predecessor_type> my_built_predecessors;
1630 #endif
1631
1632     friend class internal::forward_task_bypass< buffer_node< T, A > >;
1633
1634     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1635 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1636         , add_blt_succ, del_blt_succ,
1637         add_blt_pred, del_blt_pred,
1638         blt_succ_cnt, blt_pred_cnt,
1639         blt_succ_cpy, blt_pred_cpy   // create vector copies of preds and succs
1640 #endif
1641     };
1642     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1643
1644     // implements the aggregator_operation concept
1645     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1646     public:
1647         char type;
1648 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1649         task * ltask;
1650         union {
1651             input_type *elem;
1652             successor_type *r;
1653             predecessor_type *p;
1654             size_t cnt_val;
1655             successor_vector_type *svec;
1656             predecessor_vector_type *pvec;
1657         };
1658 #else
1659         T *elem;
1660         task * ltask;
1661         successor_type *r;
1662 #endif
1663         buffer_operation(const T& e, op_type t) : type(char(t))
1664
1665 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1666                                                   , ltask(NULL), elem(const_cast<T*>(&e))
1667 #else
1668                                                   , elem(const_cast<T*>(&e)) , ltask(NULL)
1669 #endif
1670         {}
1671         buffer_operation(op_type t) : type(char(t)),  ltask(NULL) {}
1672     };
1673
1674     bool forwarder_busy;
1675     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
1676     friend class internal::aggregating_functor<my_class, buffer_operation>;
1677     internal::aggregator< my_handler, buffer_operation> my_aggregator;
1678
1679     virtual void handle_operations(buffer_operation *op_list) {
1680         buffer_operation *tmp = NULL;
1681         bool try_forwarding=false;
1682         while (op_list) {
1683             tmp = op_list;
1684             op_list = op_list->next;
1685             switch (tmp->type) {
1686             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
1687             case rem_succ: internal_rem_succ(tmp); break;
1688             case req_item: internal_pop(tmp); break;
1689             case res_item: internal_reserve(tmp); break;
1690             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
1691             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
1692             case put_item: internal_push(tmp);  try_forwarding = (tmp->status == SUCCEEDED); break;
1693             case try_fwd_task: internal_forward_task(tmp); break;
1694 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1695             // edge recording
1696             case add_blt_succ: internal_add_built_succ(tmp); break;
1697             case del_blt_succ: internal_del_built_succ(tmp); break;
1698             case add_blt_pred: internal_add_built_pred(tmp); break;
1699             case del_blt_pred: internal_del_built_pred(tmp); break;
1700             case blt_succ_cnt: internal_succ_cnt(tmp); break;
1701             case blt_pred_cnt: internal_pred_cnt(tmp); break;
1702             case blt_succ_cpy: internal_copy_succs(tmp); break;
1703             case blt_pred_cpy: internal_copy_preds(tmp); break;
1704 #endif
1705             }
1706         }
1707         if (try_forwarding && !forwarder_busy) {
1708             task* tp = this->my_graph.root_task();
1709             if(tp) {
1710                 forwarder_busy = true;
1711                 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
1712                         forward_task_bypass
1713                         < buffer_node<input_type, A> >(*this);
1714                 // tmp should point to the last item handled by the aggregator.  This is the operation
1715                 // the handling thread enqueued.  So modifying that record will be okay.
1716                 tbb::task *z = tmp->ltask;
1717                 tmp->ltask = combine_tasks(z, new_task);  // in case the op generated a task
1718             }
1719         }
1720     }
1721
1722     inline task *grab_forwarding_task( buffer_operation &op_data) {
1723         return op_data.ltask;
1724     }
1725
1726     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1727         task *ft = grab_forwarding_task(op_data);
1728         if(ft) {
1729             FLOW_SPAWN(*ft);
1730             return true;
1731         }
1732         return false;
1733     }
1734
1735     //! This is executed by an enqueued task, the "forwarder"
1736     virtual task *forward_task() {
1737         buffer_operation op_data(try_fwd_task);
1738         task *last_task = NULL;
1739         do {
1740             op_data.status = WAIT;
1741             op_data.ltask = NULL;
1742             my_aggregator.execute(&op_data);
1743             tbb::task *xtask = op_data.ltask;
1744             last_task = combine_tasks(last_task, xtask);
1745         } while (op_data.status == SUCCEEDED);
1746         return last_task;
1747     }
1748
1749     //! Register successor
1750     virtual void internal_reg_succ(buffer_operation *op) {
1751         my_successors.register_successor(*(op->r));
1752         __TBB_store_with_release(op->status, SUCCEEDED);
1753     }
1754
1755     //! Remove successor
1756     virtual void internal_rem_succ(buffer_operation *op) {
1757         my_successors.remove_successor(*(op->r));
1758         __TBB_store_with_release(op->status, SUCCEEDED);
1759     }
1760
1761 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1762     virtual void internal_add_built_succ(buffer_operation *op) {
1763         my_successors.internal_add_built_successor(*(op->r));
1764         __TBB_store_with_release(op->status, SUCCEEDED);
1765     }
1766
1767     virtual void internal_del_built_succ(buffer_operation *op) {
1768         my_successors.internal_delete_built_successor(*(op->r));
1769         __TBB_store_with_release(op->status, SUCCEEDED);
1770     }
1771
1772     virtual void internal_add_built_pred(buffer_operation *op) {
1773         my_built_predecessors.add_edge(*(op->p));
1774         __TBB_store_with_release(op->status, SUCCEEDED);
1775     }
1776
1777     virtual void internal_del_built_pred(buffer_operation *op) {
1778         my_built_predecessors.delete_edge(*(op->p));
1779         __TBB_store_with_release(op->status, SUCCEEDED);
1780     }
1781
1782     virtual void internal_succ_cnt(buffer_operation *op) {
1783         op->cnt_val = my_successors.successor_count();
1784         __TBB_store_with_release(op->status, SUCCEEDED);
1785     }
1786
1787     virtual void internal_pred_cnt(buffer_operation *op) {
1788         op->cnt_val = my_built_predecessors.edge_count();
1789         __TBB_store_with_release(op->status, SUCCEEDED);
1790     }
1791
1792     virtual void internal_copy_succs(buffer_operation *op) {
1793         my_successors.copy_successors(*(op->svec));
1794         __TBB_store_with_release(op->status, SUCCEEDED);
1795     }
1796
1797     virtual void internal_copy_preds(buffer_operation *op) {
1798         my_built_predecessors.copy_edges(*(op->pvec));
1799         __TBB_store_with_release(op->status, SUCCEEDED);
1800     }
1801 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1802
1803     //! Tries to forward valid items to successors
1804     virtual void internal_forward_task(buffer_operation *op) {
1805         if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) {
1806             __TBB_store_with_release(op->status, FAILED);
1807             this->forwarder_busy = false;
1808             return;
1809         }
1810         T i_copy;
1811         task * last_task = NULL;
1812         size_type counter = my_successors.size();
1813         // Try forwarding, giving each successor a chance
1814         while (counter>0 && !this->buffer_empty() && this->my_item_valid(this->my_tail-1)) {
1815             this->copy_back(i_copy);
1816             task *new_task = my_successors.try_put_task(i_copy);
1817             if(new_task) {
1818                 last_task = combine_tasks(last_task, new_task);
1819                 this->destroy_back();
1820             }
1821             --counter;
1822         }
1823         op->ltask = last_task;  // return task
1824         if (last_task && !counter) {
1825             __TBB_store_with_release(op->status, SUCCEEDED);
1826         }
1827         else {
1828             __TBB_store_with_release(op->status, FAILED);
1829             forwarder_busy = false;
1830         }
1831     }
1832
1833     virtual void internal_push(buffer_operation *op) {
1834         this->push_back(*(op->elem));
1835         __TBB_store_with_release(op->status, SUCCEEDED);
1836     }
1837
1838     virtual void internal_pop(buffer_operation *op) {
1839         if(this->pop_back(*(op->elem))) {
1840             __TBB_store_with_release(op->status, SUCCEEDED);
1841         }
1842         else {
1843             __TBB_store_with_release(op->status, FAILED);
1844         }
1845     }
1846
1847     virtual void internal_reserve(buffer_operation *op) {
1848         if(this->reserve_front(*(op->elem))) {
1849             __TBB_store_with_release(op->status, SUCCEEDED);
1850         }
1851         else {
1852             __TBB_store_with_release(op->status, FAILED);
1853         }
1854     }
1855
1856     virtual void internal_consume(buffer_operation *op) {
1857         this->consume_front();
1858         __TBB_store_with_release(op->status, SUCCEEDED);
1859     }
1860
1861     virtual void internal_release(buffer_operation *op) {
1862         this->release_front();
1863         __TBB_store_with_release(op->status, SUCCEEDED);
1864     }
1865
1866 public:
1867     //! Constructor
1868     buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1869         forwarder_busy(false) {
1870         my_successors.set_owner(this);
1871         my_aggregator.initialize_handler(my_handler(this));
1872         tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1873                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1874     }
1875
1876     //! Copy constructor
1877     buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
1878         internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1879         forwarder_busy = false;
1880         my_successors.set_owner(this);
1881         my_aggregator.initialize_handler(my_handler(this));
1882         tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1883                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1884     }
1885
1886     virtual ~buffer_node() {}
1887
1888 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1889     /* override */ void set_name( const char *name ) {
1890         tbb::internal::fgt_node_desc( this, name );
1891     }
1892 #endif
1893
1894     //
1895     // message sender implementation
1896     //
1897
1898     //! Adds a new successor.
1899     /** Adds successor r to the list of successors; may forward tasks.  */
1900     /* override */ bool register_successor( successor_type &r ) {
1901         buffer_operation op_data(reg_succ);
1902         op_data.r = &r;
1903         my_aggregator.execute(&op_data);
1904         (void)enqueue_forwarding_task(op_data);
1905         return true;
1906     }
1907
1908 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1909     /*override*/ void internal_add_built_successor( successor_type &r) {
1910         buffer_operation op_data(add_blt_succ);
1911         op_data.r = &r;
1912         my_aggregator.execute(&op_data);
1913     }
1914
1915     /*override*/ void internal_delete_built_successor( successor_type &r) {
1916         buffer_operation op_data(del_blt_succ);
1917         op_data.r = &r;
1918         my_aggregator.execute(&op_data);
1919     }
1920
1921     /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1922         buffer_operation op_data(add_blt_pred);
1923         op_data.p = &p;
1924         my_aggregator.execute(&op_data);
1925     }
1926
1927     /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1928         buffer_operation op_data(del_blt_pred);
1929         op_data.p = &p;
1930         my_aggregator.execute(&op_data);
1931     }
1932
1933     /*override*/ size_t predecessor_count() {
1934         buffer_operation op_data(blt_pred_cnt);
1935         my_aggregator.execute(&op_data);
1936         return op_data.cnt_val;
1937     }
1938
1939     /*override*/ size_t successor_count() {
1940         buffer_operation op_data(blt_succ_cnt);
1941         my_aggregator.execute(&op_data);
1942         return op_data.cnt_val;
1943     }
1944
1945     /*override*/ void copy_predecessors( predecessor_vector_type &v ) {
1946         buffer_operation op_data(blt_pred_cpy);
1947         op_data.pvec = &v;
1948         my_aggregator.execute(&op_data);
1949     }
1950
1951     /*override*/ void copy_successors( successor_vector_type &v ) {
1952         buffer_operation op_data(blt_succ_cpy);
1953         op_data.svec = &v;
1954         my_aggregator.execute(&op_data);
1955     }
1956 #endif
1957
1958     //! Removes a successor.
1959     /** Removes successor r from the list of successors.
1960         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1961     /* override */ bool remove_successor( successor_type &r ) {
1962         r.remove_predecessor(*this);
1963         buffer_operation op_data(rem_succ);
1964         op_data.r = &r;
1965         my_aggregator.execute(&op_data);
1966         // even though this operation does not cause a forward, if we are the handler, and
1967         // a forward is scheduled, we may be the first to reach this point after the aggregator,
1968         // and so should check for the task.
1969         (void)enqueue_forwarding_task(op_data);
1970         return true;
1971     }
1972
1973     //! Request an item from the buffer_node
1974     /**  true = v contains the returned item<BR>
1975          false = no item has been returned */
1976     /* override */ bool try_get( T &v ) {
1977         buffer_operation op_data(req_item);
1978         op_data.elem = &v;
1979         my_aggregator.execute(&op_data);
1980         (void)enqueue_forwarding_task(op_data);
1981         return (op_data.status==SUCCEEDED);
1982     }
1983
1984     //! Reserves an item.
1985     /**  false = no item can be reserved<BR>
1986          true = an item is reserved */
1987     /* override */ bool try_reserve( T &v ) {
1988         buffer_operation op_data(res_item);
1989         op_data.elem = &v;
1990         my_aggregator.execute(&op_data);
1991         (void)enqueue_forwarding_task(op_data);
1992         return (op_data.status==SUCCEEDED);
1993     }
1994
1995     //! Release a reserved item.
1996     /**  true = item has been released and so remains in sender */
1997     /* override */ bool try_release() {
1998         buffer_operation op_data(rel_res);
1999         my_aggregator.execute(&op_data);
2000         (void)enqueue_forwarding_task(op_data);
2001         return true;
2002     }
2003
2004     //! Consumes a reserved item.
2005     /** true = item is removed from sender and reservation removed */
2006     /* override */ bool try_consume() {
2007         buffer_operation op_data(con_res);
2008         my_aggregator.execute(&op_data);
2009         (void)enqueue_forwarding_task(op_data);
2010         return true;
2011     }
2012
2013 protected:
2014
2015     template< typename R, typename B > friend class run_and_put_task;
2016     template<typename X, typename Y> friend class internal::broadcast_cache;
2017     template<typename X, typename Y> friend class internal::round_robin_cache;
2018     //! receive an item, return a task *if possible
2019     /* override */ task *try_put_task(const T &t) {
2020         buffer_operation op_data(t, put_item);
2021         my_aggregator.execute(&op_data);
2022         task *ft = grab_forwarding_task(op_data);
2023         // sequencer_nodes can return failure (if an item has been previously inserted)
2024         // We have to spawn the returned task if our own operation fails.
2025
2026         if(ft && op_data.status == FAILED) {
2027             // we haven't succeeded queueing the item, but for some reason the
2028             // call returned a task (if another request resulted in a successful
2029             // forward this could happen.)  Queue the task and reset the pointer.
2030             FLOW_SPAWN(*ft); ft = NULL;
2031         }
2032         else if(!ft && op_data.status == SUCCEEDED) {
2033             ft = SUCCESSFULLY_ENQUEUED;
2034         }
2035         return ft;
2036     }
2037
2038     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2039         internal::reservable_item_buffer<T, A>::reset();
2040 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2041         my_successors.reset(f);
2042         if (f&rf_extract) {
2043             my_built_predecessors.receiver_extract(*this);
2044         }
2045 #endif
2046         forwarder_busy = false;
2047     }
2048
2049     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) { }
2050
2051 };  // buffer_node
2052
2053 //! Forwards messages in FIFO order
2054 template <typename T, typename A=cache_aligned_allocator<T> >
2055 class queue_node : public buffer_node<T, A> {
2056 protected:
2057     typedef buffer_node<T, A> base_type;
2058     typedef typename base_type::size_type size_type;
2059     typedef typename base_type::buffer_operation queue_operation;
2060
2061     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2062
2063     /* override */ void internal_forward_task(queue_operation *op) {
2064         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2065             __TBB_store_with_release(op->status, FAILED);
2066             this->forwarder_busy = false;
2067             return;
2068         }
2069         T i_copy;
2070         task *last_task = NULL;
2071         size_type counter = this->my_successors.size();
2072         // Keep trying to send items while there is at least one accepting successor
2073         while (counter>0 && this->my_item_valid(this->my_head)) {
2074             this->copy_front(i_copy);
2075             task *new_task = this->my_successors.try_put_task(i_copy);
2076             if(new_task) {
2077                 this->destroy_front();
2078                 last_task = combine_tasks(last_task, new_task);
2079             }
2080             --counter;
2081         }
2082         op->ltask = last_task;
2083         if (last_task && !counter)
2084             __TBB_store_with_release(op->status, SUCCEEDED);
2085         else {
2086             __TBB_store_with_release(op->status, FAILED);
2087             this->forwarder_busy = false;
2088         }
2089     }
2090
2091     /* override */ void internal_pop(queue_operation *op) {
2092         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2093             __TBB_store_with_release(op->status, FAILED);
2094         }
2095         else {
2096             this->pop_front(*(op->elem));
2097             __TBB_store_with_release(op->status, SUCCEEDED);
2098         }
2099     }
2100     /* override */ void internal_reserve(queue_operation *op) {
2101         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2102             __TBB_store_with_release(op->status, FAILED);
2103         }
2104         else {
2105             this->reserve_front(*(op->elem));
2106             __TBB_store_with_release(op->status, SUCCEEDED);
2107         }
2108     }
2109     /* override */ void internal_consume(queue_operation *op) {
2110         this->consume_front();
2111         __TBB_store_with_release(op->status, SUCCEEDED);
2112     }
2113
2114 public:
2115     typedef T input_type;
2116     typedef T output_type;
2117     typedef sender< input_type > predecessor_type;
2118     typedef receiver< output_type > successor_type;
2119
2120     //! Constructor
2121     queue_node( graph &g ) : base_type(g) {
2122         tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2123                                  static_cast<receiver<input_type> *>(this),
2124                                  static_cast<sender<output_type> *>(this) );
2125     }
2126
2127     //! Copy constructor
2128     queue_node( const queue_node& src) : base_type(src) {
2129         tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2130                                  static_cast<receiver<input_type> *>(this),
2131                                  static_cast<sender<output_type> *>(this) );
2132     }
2133
2134 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2135     /* override */ void set_name( const char *name ) {
2136         tbb::internal::fgt_node_desc( this, name );
2137     }
2138 #endif
2139
2140     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2141         base_type::reset(__TBB_PFG_RESET_ARG(f));
2142     }
2143 };  // queue_node
2144
2145 //! Forwards messages in sequence order
2146 template< typename T, typename A=cache_aligned_allocator<T> >
2147 class sequencer_node : public queue_node<T, A> {
2148     internal::function_body< T, size_t > *my_sequencer;
2149     // my_sequencer should be a benign function and must be callable
2150     // from a parallel context.  Does this mean it needn't be reset?
2151 public:
2152     typedef T input_type;
2153     typedef T output_type;
2154     typedef sender< input_type > predecessor_type;
2155     typedef receiver< output_type > successor_type;
2156
2157     //! Constructor
2158     template< typename Sequencer >
2159     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2160         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2161         tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2162                                  static_cast<receiver<input_type> *>(this),
2163                                  static_cast<sender<output_type> *>(this) );
2164     }
2165
2166     //! Copy constructor
2167     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2168         my_sequencer( src.my_sequencer->clone() ) {
2169         tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2170                                  static_cast<receiver<input_type> *>(this),
2171                                  static_cast<sender<output_type> *>(this) );
2172     }
2173
2174     //! Destructor
2175     ~sequencer_node() { delete my_sequencer; }
2176
2177 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2178     /* override */ void set_name( const char *name ) {
2179         tbb::internal::fgt_node_desc( this, name );
2180     }
2181 #endif
2182
2183 protected:
2184     typedef typename buffer_node<T, A>::size_type size_type;
2185     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
2186
2187     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2188
2189 private:
2190     /* override */ void internal_push(sequencer_operation *op) {
2191         size_type tag = (*my_sequencer)(*(op->elem));
2192 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2193         if(tag < this->my_head) {
2194             // have already emitted a message with this tag
2195             __TBB_store_with_release(op->status, FAILED);
2196             return;
2197         }
2198 #endif
2199         // cannot modify this->my_tail now; the buffer would be inconsistent.
2200         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2201
2202         if(this->size(new_tail) > this->capacity()) {
2203             this->grow_my_array(this->size(new_tail));
2204         }
2205         this->my_tail = new_tail;
2206         if(this->place_item(tag,*(op->elem))) {
2207             __TBB_store_with_release(op->status, SUCCEEDED);
2208         }
2209         else {
2210             // already have a message with this tag
2211             __TBB_store_with_release(op->status, FAILED);
2212         }
2213     }
2214 };  // sequencer_node
2215
2216 //! Forwards messages in priority order
2217 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2218 class priority_queue_node : public buffer_node<T, A> {
2219 public:
2220     typedef T input_type;
2221     typedef T output_type;
2222     typedef buffer_node<T,A> base_type;
2223     typedef sender< input_type > predecessor_type;
2224     typedef receiver< output_type > successor_type;
2225
2226     //! Constructor
2227     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2228         tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2229                                  static_cast<receiver<input_type> *>(this),
2230                                  static_cast<sender<output_type> *>(this) );
2231     }
2232
2233     //! Copy constructor
2234     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2235         tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2236                                  static_cast<receiver<input_type> *>(this),
2237                                  static_cast<sender<output_type> *>(this) );
2238     }
2239
2240 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2241     /* override */ void set_name( const char *name ) {
2242         tbb::internal::fgt_node_desc( this, name );
2243     }
2244 #endif
2245
2246
2247 protected:
2248
2249     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2250         mark = 0;
2251         base_type::reset(__TBB_PFG_RESET_ARG(f));
2252     }
2253
2254     typedef typename buffer_node<T, A>::size_type size_type;
2255     typedef typename buffer_node<T, A>::item_type item_type;
2256     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
2257
2258     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2259
2260     /* override */ void handle_operations(prio_operation *op_list) {
2261         prio_operation *tmp = op_list /*, *pop_list*/ ;
2262         bool try_forwarding=false;
2263         while (op_list) {
2264             tmp = op_list;
2265             op_list = op_list->next;
2266             switch (tmp->type) {
2267             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
2268             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
2269             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
2270             case buffer_node<T, A>::try_fwd_task: internal_forward_task(tmp); break;
2271             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
2272             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
2273             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
2274             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
2275 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2276             case buffer_node<T, A>::add_blt_succ: this->internal_add_built_succ(tmp); break;
2277             case buffer_node<T, A>::del_blt_succ: this->internal_del_built_succ(tmp); break;
2278             case buffer_node<T, A>::add_blt_pred: this->internal_add_built_pred(tmp); break;
2279             case buffer_node<T, A>::del_blt_pred: this->internal_del_built_pred(tmp); break;
2280             case buffer_node<T, A>::blt_succ_cnt: this->internal_succ_cnt(tmp); break;
2281             case buffer_node<T, A>::blt_pred_cnt: this->internal_pred_cnt(tmp); break;
2282             case buffer_node<T, A>::blt_succ_cpy: this->internal_copy_succs(tmp); break;
2283             case buffer_node<T, A>::blt_pred_cpy: this->internal_copy_preds(tmp); break;
2284 #endif
2285             }
2286         }
2287         // process pops!  for now, no special pop processing
2288         // concurrent_priority_queue handles pushes first, then pops.
2289         // that is the genesis of this comment
2290         if (mark<this->my_tail) heapify();
2291         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2292         if (try_forwarding && !this->forwarder_busy) {  // could we also test for this->my_tail (queue non-empty)?
2293             task* tp = this->my_graph.root_task();
2294             if(tp) {
2295                 this->forwarder_busy = true;
2296                 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
2297                         forward_task_bypass
2298                         < buffer_node<input_type, A> >(*this);
2299                 // tmp should point to the last item handled by the aggregator.  This is the operation
2300                 // the handling thread enqueued.  So modifying that record will be okay.
2301                 tbb::task *tmp1 = tmp->ltask;
2302                 tmp->ltask = combine_tasks(tmp1, new_task);
2303             }
2304         }
2305     }
2306
2307     //! Tries to forward valid items to successors
2308     /* override */ void internal_forward_task(prio_operation *op) {
2309         T i_copy;
2310         task * last_task = NULL; // flagged when a successor accepts
2311         size_type counter = this->my_successors.size();
2312
2313         if (this->my_reserved || this->my_tail == 0) {
2314             __TBB_store_with_release(op->status, FAILED);
2315             this->forwarder_busy = false;
2316             return;
2317         }
2318         // Keep trying to send while there exists an accepting successor
2319         while (counter>0 && this->my_tail > 0) {
2320             prio_copy(i_copy);
2321             task * new_task = this->my_successors.try_put_task(i_copy);
2322             if ( new_task ) {
2323                 last_task = combine_tasks(last_task, new_task);
2324                 prio_pop();
2325             }
2326             --counter;
2327         }
2328         op->ltask = last_task;
2329         if (last_task && !counter)
2330             __TBB_store_with_release(op->status, SUCCEEDED);
2331         else {
2332             __TBB_store_with_release(op->status, FAILED);
2333             this->forwarder_busy = false;
2334         }
2335     }
2336
2337     /* override */ void internal_push(prio_operation *op) {
2338         prio_push(*(op->elem));
2339         __TBB_store_with_release(op->status, SUCCEEDED);
2340     }
2341
2342     /* override */ void internal_pop(prio_operation *op) {
2343         // if empty or already reserved, don't pop
2344         if ( this->my_reserved == true || this->my_tail == 0 ) {
2345             __TBB_store_with_release(op->status, FAILED);
2346             return;
2347         }
2348
2349         prio_copy(*(op->elem));
2350         __TBB_store_with_release(op->status, SUCCEEDED);
2351         prio_pop();
2352
2353     }
2354
2355     // pops the highest-priority item, saves copy
2356     /* override */ void internal_reserve(prio_operation *op) {
2357         if (this->my_reserved == true || this->my_tail == 0) {
2358             __TBB_store_with_release(op->status, FAILED);
2359             return;
2360         }
2361         this->my_reserved = true;
2362         prio_copy(*(op->elem));
2363         reserved_item = *(op->elem);
2364         __TBB_store_with_release(op->status, SUCCEEDED);
2365         prio_pop();
2366     }
2367
2368     /* override */ void internal_consume(prio_operation *op) {
2369         __TBB_store_with_release(op->status, SUCCEEDED);
2370         this->my_reserved = false;
2371         reserved_item = input_type();
2372     }
2373
2374     /* override */ void internal_release(prio_operation *op) {
2375         __TBB_store_with_release(op->status, SUCCEEDED);
2376         prio_push(reserved_item);
2377         this->my_reserved = false;
2378         reserved_item = input_type();
2379     }
2380 private:
2381     Compare compare;
2382     size_type mark;
2383
2384     input_type reserved_item;
2385
2386     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2387     bool prio_use_tail() {
2388         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2389         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2390     }
2391
2392     // prio_push: checks that the item will fit, expand array if necessary, put at end
2393     void prio_push(const T &src) {
2394         if ( this->my_tail >= this->my_array_size )
2395             this->grow_my_array( this->my_tail + 1 );
2396         (void) this->place_item(this->my_tail, src);
2397         ++(this->my_tail);
2398         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2399     }
2400
2401     // prio_pop: deletes highest priority item from the array, and if it is item
2402     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
2403     // and mark.  Assumes the array has already been tested for emptiness; no failure.
2404     void prio_pop()  {
2405         if (prio_use_tail()) {
2406             // there are newly pushed elems; last one higher than top
2407             // copy the data
2408             this->destroy_item(this->my_tail-1);
2409             --(this->my_tail);
2410             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2411             return;
2412         }
2413         this->destroy_item(0);
2414         if(this->my_tail > 1) {
2415             // push the last element down heap
2416             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2417             this->move_item(0,this->my_tail - 1);
2418         }
2419         --(this->my_tail);
2420         if(mark > this->my_tail) --mark;
2421         if (this->my_tail > 1) // don't reheap for heap of size 1
2422             reheap();
2423         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2424     }
2425
2426     void prio_copy(T &res) {
2427         if (prio_use_tail()) {
2428             res = this->get_my_item(this->my_tail - 1);
2429         }
2430         else {
2431             res = this->get_my_item(0);
2432         }
2433     }
2434
2435     // turn array into heap
2436     void heapify() {
2437         if(this->my_tail == 0) {
2438             mark = 0;
2439             return;
2440         }
2441         if (!mark) mark = 1;
2442         for (; mark<this->my_tail; ++mark) { // for each unheaped element
2443             size_type cur_pos = mark;
2444             input_type to_place;
2445             this->fetch_item(mark,to_place);
2446             do { // push to_place up the heap
2447                 size_type parent = (cur_pos-1)>>1;
2448                 if (!compare(this->get_my_item(parent), to_place))
2449                     break;
2450                 this->move_item(cur_pos, parent);
2451                 cur_pos = parent;
2452             } while( cur_pos );
2453             (void) this->place_item(cur_pos, to_place);
2454         }
2455     }
2456
2457     // otherwise heapified array with new root element; rearrange to heap
2458     void reheap() {
2459         size_type cur_pos=0, child=1;
2460         while (child < mark) {
2461             size_type target = child;
2462             if (child+1<mark &&
2463                 compare(this->get_my_item(child),
2464                         this->get_my_item(child+1)))
2465                 ++target;
2466             // target now has the higher priority child
2467             if (compare(this->get_my_item(target),
2468                         this->get_my_item(cur_pos)))
2469                 break;
2470             // swap
2471             this->swap_items(cur_pos, target);
2472             cur_pos = target;
2473             child = (cur_pos<<1)+1;
2474         }
2475     }
2476 };  // priority_queue_node
2477
2478 //! Forwards messages only if the threshold has not been reached
2479 /** This node forwards items until its threshold is reached.
2480     It contains no buffering.  If the downstream node rejects, the
2481     message is dropped. */
2482 template< typename T >
2483 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2484 protected:
2485     using graph_node::my_graph;
2486 public:
2487     typedef T input_type;
2488     typedef T output_type;
2489     typedef sender< input_type > predecessor_type;
2490     typedef receiver< output_type > successor_type;
2491 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2492     typedef std::vector<successor_type *> successor_vector_type;
2493     typedef std::vector<predecessor_type *> predecessor_vector_type;
2494 #endif
2495
2496 private:
2497     size_t my_threshold;
2498     size_t my_count; //number of successful puts
2499     size_t my_tries; //number of active put attempts
2500     internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
2501     spin_mutex my_mutex;
2502     internal::broadcast_cache< T > my_successors;
2503     int init_decrement_predecessors;
2504
2505     friend class internal::forward_task_bypass< limiter_node<T> >;
2506
2507     // Let decrementer call decrement_counter()
2508     friend class internal::decrementer< limiter_node<T> >;
2509
2510     bool check_conditions() {  // always called under lock
2511         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2512     }
2513
2514     // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2515     task *forward_task() {
2516         input_type v;
2517         task *rval = NULL;
2518         bool reserved = false;
2519             {
2520                 spin_mutex::scoped_lock lock(my_mutex);
2521                 if ( check_conditions() )
2522                     ++my_tries;
2523                 else
2524                     return NULL;
2525             }
2526
2527         //SUCCESS 
2528         // if we can reserve and can put, we consume the reservation 
2529         // we increment the count and decrement the tries
2530         if ( (my_predecessors.try_reserve(v)) == true ){
2531             reserved=true;
2532             if ( (rval = my_successors.try_put_task(v)) != NULL ){
2533                 {
2534                     spin_mutex::scoped_lock lock(my_mutex);
2535                     ++my_count;
2536                     --my_tries;
2537                     my_predecessors.try_consume();
2538                     if ( check_conditions() ) {
2539                         task* tp = this->my_graph.root_task();
2540                         if ( tp ) {
2541                             task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2542                                 internal::forward_task_bypass< limiter_node<T> >( *this );
2543                             FLOW_SPAWN (*rtask);
2544                         }
2545                     }
2546                 }
2547                 return rval;
2548             }
2549         }
2550         //FAILURE
2551         //if we can't reserve, we decrement the tries
2552         //if we can reserve but can't put, we decrement the tries and release the reservation
2553         {
2554             spin_mutex::scoped_lock lock(my_mutex);
2555             --my_tries;
2556             if (reserved) my_predecessors.try_release();
2557             if ( check_conditions() ) {
2558                 task* tp = this->my_graph.root_task();
2559                 if ( tp ) {
2560                     task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2561                         internal::forward_task_bypass< limiter_node<T> >( *this );
2562                     __TBB_ASSERT(!rval, "Have two tasks to handle");
2563                     return rtask;
2564                 }
2565             }
2566             return rval;
2567         }
2568     }
2569
2570     void forward() {
2571         __TBB_ASSERT(false, "Should never be called");
2572         return;
2573     }
2574
2575     task * decrement_counter() {
2576         {
2577             spin_mutex::scoped_lock lock(my_mutex);
2578             if(my_count) --my_count;
2579         }
2580         return forward_task();
2581     }
2582
2583 public:
2584     //! The internal receiver< continue_msg > that decrements the count
2585     internal::decrementer< limiter_node<T> > decrement;
2586
2587     //! Constructor
2588     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
2589         graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2590         init_decrement_predecessors(num_decrement_predecessors),
2591         decrement(num_decrement_predecessors)
2592     {
2593         my_predecessors.set_owner(this);
2594         my_successors.set_owner(this);
2595         decrement.set_owner(this);
2596         tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2597                                  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2598                                  static_cast<sender<output_type> *>(this) );
2599     }
2600
2601     //! Copy constructor
2602     limiter_node( const limiter_node& src ) :
2603         graph_node(src.my_graph), receiver<T>(), sender<T>(),
2604         my_threshold(src.my_threshold), my_count(0), my_tries(0),
2605         init_decrement_predecessors(src.init_decrement_predecessors),
2606         decrement(src.init_decrement_predecessors)
2607     {
2608         my_predecessors.set_owner(this);
2609         my_successors.set_owner(this);
2610         decrement.set_owner(this);
2611         tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2612                                  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2613                                  static_cast<sender<output_type> *>(this) );
2614     }
2615
2616 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2617     /* override */ void set_name( const char *name ) {
2618         tbb::internal::fgt_node_desc( this, name );
2619     }
2620 #endif
2621
2622     //! Replace the current successor with this new successor
2623     /* override */ bool register_successor( receiver<output_type> &r ) {
2624         spin_mutex::scoped_lock lock(my_mutex);
2625         bool was_empty = my_successors.empty();
2626         my_successors.register_successor(r);
2627         //spawn a forward task if this is the only successor
2628         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2629             task* tp = this->my_graph.root_task();
2630             if ( tp ) {
2631                 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2632                             internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2633             }
2634         }
2635         return true;
2636     }
2637
2638     //! Removes a successor from this node
2639     /** r.remove_predecessor(*this) is also called. */
2640     /* override */ bool remove_successor( receiver<output_type> &r ) {
2641         r.remove_predecessor(*this);
2642         my_successors.remove_successor(r);
2643         return true;
2644     }
2645
2646 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2647     /*override*/void internal_add_built_successor(receiver<output_type> &src) {
2648         my_successors.internal_add_built_successor(src);
2649     }
2650
2651     /*override*/void internal_delete_built_successor(receiver<output_type> &src) {
2652         my_successors.internal_delete_built_successor(src);
2653     }
2654
2655     /*override*/size_t successor_count() { return my_successors.successor_count(); }
2656
2657     /*override*/ void copy_successors(successor_vector_type &v) {
2658         my_successors.copy_successors(v);
2659     }
2660
2661     /*override*/void internal_add_built_predecessor(sender<output_type> &src) {
2662         my_predecessors.internal_add_built_predecessor(src);
2663     }
2664
2665     /*override*/void internal_delete_built_predecessor(sender<output_type> &src) {
2666         my_predecessors.internal_delete_built_predecessor(src);
2667     }
2668
2669     /*override*/size_t predecessor_count() { return my_predecessors.predecessor_count(); }
2670
2671     /*override*/ void copy_predecessors(predecessor_vector_type &v) {
2672         my_predecessors.copy_predecessors(v);
2673     }
2674 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
2675
2676     //! Adds src to the list of cached predecessors.
2677     /* override */ bool register_predecessor( predecessor_type &src ) {
2678         spin_mutex::scoped_lock lock(my_mutex);
2679         my_predecessors.add( src );
2680         task* tp = this->my_graph.root_task();
2681         if ( my_count + my_tries < my_threshold && !my_successors.empty() && tp ) {
2682             FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2683                         internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2684         }
2685         return true;
2686     }
2687
2688     //! Removes src from the list of cached predecessors.
2689     /* override */ bool remove_predecessor( predecessor_type &src ) {
2690         my_predecessors.remove( src );
2691         return true;
2692     }
2693
2694 protected:
2695
2696     template< typename R, typename B > friend class run_and_put_task;
2697     template<typename X, typename Y> friend class internal::broadcast_cache;
2698     template<typename X, typename Y> friend class internal::round_robin_cache;
2699     //! Puts an item to this receiver
2700     /* override */ task *try_put_task( const T &t ) {
2701         {
2702             spin_mutex::scoped_lock lock(my_mutex);
2703             if ( my_count + my_tries >= my_threshold )
2704                 return NULL;
2705             else
2706                 ++my_tries;
2707         }
2708
2709         task * rtask = my_successors.try_put_task(t);
2710
2711         if ( !rtask ) {  // try_put_task failed.
2712             spin_mutex::scoped_lock lock(my_mutex);
2713             --my_tries;
2714             task* tp = this->my_graph.root_task();
2715             if ( check_conditions() && tp ) {
2716                 rtask = new ( task::allocate_additional_child_of( *tp ) )
2717                     internal::forward_task_bypass< limiter_node<T> >( *this );
2718             }
2719         }
2720         else {
2721             spin_mutex::scoped_lock lock(my_mutex);
2722             ++my_count;
2723             --my_tries;
2724              }
2725         return rtask;
2726     }
2727
2728     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2729         my_count = 0;
2730         my_predecessors.reset(__TBB_PFG_RESET_ARG(f));
2731         decrement.reset_receiver(__TBB_PFG_RESET_ARG(f));
2732 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2733         my_successors.reset(f);
2734 #endif
2735     }
2736
2737     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); }
2738 };  // limiter_node
2739
2740 #include "internal/_flow_graph_join_impl.h"
2741
2742 using internal::reserving_port;
2743 using internal::queueing_port;
2744 using internal::tag_matching_port;
2745 using internal::input_port;
2746 using internal::tag_value;
2747 using internal::NO_TAG;
2748
2749 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
2750
2751 template<typename OutputTuple>
2752 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2753 private:
2754     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2755     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2756 public:
2757     typedef OutputTuple output_type;
2758     typedef typename unfolded_type::input_ports_type input_ports_type;
2759     join_node(graph &g) : unfolded_type(g) {
2760         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2761                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2762     }
2763     join_node(const join_node &other) : unfolded_type(other) {
2764         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2765                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2766     }
2767
2768 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2769     /* override */ void set_name( const char *name ) {
2770         tbb::internal::fgt_node_desc( this, name );
2771     }
2772 #endif
2773
2774 };
2775
2776 template<typename OutputTuple>
2777 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2778 private:
2779     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2780     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2781 public:
2782     typedef OutputTuple output_type;
2783     typedef typename unfolded_type::input_ports_type input_ports_type;
2784     join_node(graph &g) : unfolded_type(g) {
2785         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2786                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2787     }
2788     join_node(const join_node &other) : unfolded_type(other) {
2789         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2790                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2791     }
2792
2793 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2794     /* override */ void set_name( const char *name ) {
2795         tbb::internal::fgt_node_desc( this, name );
2796     }
2797 #endif
2798
2799 };
2800
2801 // template for tag_matching join_node
2802 template<typename OutputTuple>
2803 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2804       tag_matching_port, OutputTuple, tag_matching> {
2805 private:
2806     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2807     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
2808 public:
2809     typedef OutputTuple output_type;
2810     typedef typename unfolded_type::input_ports_type input_ports_type;
2811
2812     template<typename __TBB_B0, typename __TBB_B1>
2813     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2814         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2815                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2816     }
2817     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2818     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2819         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2820                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2821     }
2822     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2823     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2824         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2825                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2826     }
2827     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2828     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2829             unfolded_type(g, b0, b1, b2, b3, b4) {
2830         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2831                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2832     }
2833 #if __TBB_VARIADIC_MAX >= 6
2834     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2835         typename __TBB_B5>
2836     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2837             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2838         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2839                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2840     }
2841 #endif
2842 #if __TBB_VARIADIC_MAX >= 7
2843     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2844         typename __TBB_B5, typename __TBB_B6>
2845     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2846             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2847         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2848                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2849     }
2850 #endif
2851 #if __TBB_VARIADIC_MAX >= 8
2852     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2853         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2854     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2855             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2856         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2857                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2858     }
2859 #endif
2860 #if __TBB_VARIADIC_MAX >= 9
2861     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2862         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2863     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2864             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2865         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2866                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2867     }
2868 #endif
2869 #if __TBB_VARIADIC_MAX >= 10
2870     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2871         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2872     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2873             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2874         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2875                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2876     }
2877 #endif
2878     join_node(const join_node &other) : unfolded_type(other) {
2879         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2880                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2881     }
2882
2883 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2884     /* override */ void set_name( const char *name ) {
2885         tbb::internal::fgt_node_desc( this, name );
2886     }
2887 #endif
2888
2889 };
2890
2891 // indexer node
2892 #include "internal/_flow_graph_indexer_impl.h"
2893
2894 struct indexer_null_type {};
2895
2896 template<typename T0, typename T1=indexer_null_type, typename T2=indexer_null_type, typename T3=indexer_null_type,
2897                       typename T4=indexer_null_type, typename T5=indexer_null_type, typename T6=indexer_null_type,
2898                       typename T7=indexer_null_type, typename T8=indexer_null_type, typename T9=indexer_null_type> class indexer_node;
2899
2900 //indexer node specializations
2901 template<typename T0>
2902 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2903 private:
2904     static const int N = 1;
2905 public:
2906     typedef tuple<T0> InputTuple;
2907     typedef typename internal::tagged_msg<size_t, T0> output_type;
2908     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2909     indexer_node(graph& g) : unfolded_type(g) {
2910         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2911                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2912     }
2913     // Copy constructor
2914     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2915         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2916                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2917     }
2918
2919 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2920      void set_name( const char *name ) {
2921         tbb::internal::fgt_node_desc( this, name );
2922     }
2923 #endif
2924 };
2925
2926 template<typename T0, typename T1>
2927 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2928 private:
2929     static const int N = 2;
2930 public:
2931     typedef tuple<T0, T1> InputTuple;
2932     typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
2933     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2934     indexer_node(graph& g) : unfolded_type(g) {
2935         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2936                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2937     }
2938     // Copy constructor
2939     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2940         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2941                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2942     }
2943
2944 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2945      void set_name( const char *name ) {
2946         tbb::internal::fgt_node_desc( this, name );
2947     }
2948 #endif
2949 };
2950
2951 template<typename T0, typename T1, typename T2>
2952 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2953 private:
2954     static const int N = 3;
2955 public:
2956     typedef tuple<T0, T1, T2> InputTuple;
2957     typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
2958     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2959     indexer_node(graph& g) : unfolded_type(g) {
2960         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2961                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2962     }
2963     // Copy constructor
2964     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2965         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2966                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2967     }
2968
2969 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2970         void set_name( const char *name ) {
2971         tbb::internal::fgt_node_desc( this, name );
2972     }
2973 #endif
2974 };
2975
2976 template<typename T0, typename T1, typename T2, typename T3>
2977 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2978 private:
2979     static const int N = 4;
2980 public:
2981     typedef tuple<T0, T1, T2, T3> InputTuple;
2982     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
2983     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2984     indexer_node(graph& g) : unfolded_type(g) {
2985         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2986                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2987     }
2988     // Copy constructor
2989     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2990         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2991                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2992     }
2993
2994 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2995     /* override */ void set_name( const char *name ) {
2996         tbb::internal::fgt_node_desc( this, name );
2997     }
2998 #endif
2999 };
3000
3001 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3002 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3003 private:
3004     static const int N = 5;
3005 public:
3006     typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3007     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
3008     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3009     indexer_node(graph& g) : unfolded_type(g) {
3010         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3011                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3012     }
3013     // Copy constructor
3014     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3015         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3016                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3017     }
3018
3019 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3020     /* override */ void set_name( const char *name ) {
3021         tbb::internal::fgt_node_desc( this, name );
3022     }
3023 #endif
3024 };
3025
3026 #if __TBB_VARIADIC_MAX >= 6
3027 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3028 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3029 private:
3030     static const int N = 6;
3031 public:
3032     typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3033     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3034     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3035     indexer_node(graph& g) : unfolded_type(g) {
3036         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3037                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3038     }
3039     // Copy constructor
3040     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3041         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3042                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3043     }
3044
3045 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3046     /* override */ void set_name( const char *name ) {
3047         tbb::internal::fgt_node_desc( this, name );
3048     }
3049 #endif
3050 };
3051 #endif //variadic max 6
3052
3053 #if __TBB_VARIADIC_MAX >= 7
3054 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3055          typename T6>
3056 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3057 private:
3058     static const int N = 7;
3059 public:
3060     typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3061     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3062     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3063     indexer_node(graph& g) : unfolded_type(g) {
3064         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3065                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3066     }
3067     // Copy constructor
3068     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3069         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3070                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3071     }
3072
3073 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3074     /* override */ void set_name( const char *name ) {
3075         tbb::internal::fgt_node_desc( this, name );
3076     }
3077 #endif
3078 };
3079 #endif //variadic max 7
3080
3081 #if __TBB_VARIADIC_MAX >= 8
3082 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3083          typename T6, typename T7>
3084 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3085 private:
3086     static const int N = 8;
3087 public:
3088     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3089     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3090     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3091     indexer_node(graph& g) : unfolded_type(g) {
3092         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3093                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3094     }
3095     // Copy constructor
3096     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3097         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3098                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3099     }
3100
3101 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3102     /* override */ void set_name( const char *name ) {
3103         tbb::internal::fgt_node_desc( this, name );
3104     }
3105 #endif
3106 };
3107 #endif //variadic max 8
3108
3109 #if __TBB_VARIADIC_MAX >= 9
3110 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3111          typename T6, typename T7, typename T8>
3112 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3113 private:
3114     static const int N = 9;
3115 public:
3116     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3117     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3118     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3119     indexer_node(graph& g) : unfolded_type(g) {
3120         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3121                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3122     }
3123     // Copy constructor
3124     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3125         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3126                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3127     }
3128
3129 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3130     /* override */ void set_name( const char *name ) {
3131         tbb::internal::fgt_node_desc( this, name );
3132     }
3133 #endif
3134 };
3135 #endif //variadic max 9
3136
3137 #if __TBB_VARIADIC_MAX >= 10
3138 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3139          typename T6, typename T7, typename T8, typename T9>
3140 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3141 private:
3142     static const int N = 10;
3143 public:
3144     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3145     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3146     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3147     indexer_node(graph& g) : unfolded_type(g) {
3148         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3149                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3150     }
3151     // Copy constructor
3152     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3153         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3154                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3155     }
3156
3157 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3158     /* override */ void set_name( const char *name ) {
3159         tbb::internal::fgt_node_desc( this, name );
3160     }
3161 #endif
3162 };
3163 #endif //variadic max 10
3164
3165 //! Makes an edge between a single predecessor and a single successor
3166 template< typename T >
3167 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3168 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3169     s.internal_add_built_predecessor(p);
3170     p.internal_add_built_successor(s);
3171 #endif
3172     p.register_successor( s );
3173     tbb::internal::fgt_make_edge( &p, &s );
3174 }
3175
3176 //! Makes an edge between a single predecessor and a single successor
3177 template< typename T >
3178 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3179     p.remove_successor( s );
3180 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3181     // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3182     p.internal_delete_built_successor(s);
3183     s.internal_delete_built_predecessor(p);
3184 #endif
3185     tbb::internal::fgt_remove_edge( &p, &s );
3186 }
3187
3188 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3189 template<typename C >
3190 template< typename S >
3191 void edge_container<C>::sender_extract( S &s ) {
3192     edge_vector e = built_edges;
3193     for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3194         remove_edge(s, **i);
3195     }
3196 }
3197
3198 template<typename C >
3199 template< typename R >
3200 void edge_container<C>::receiver_extract( R &r ) {
3201     edge_vector e = built_edges;
3202     for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3203         remove_edge(**i, r);
3204     }
3205 }
3206 #endif
3207
3208 //! Returns a copy of the body from a function or continue node
3209 template< typename Body, typename Node >
3210 Body copy_body( Node &n ) {
3211     return n.template copy_function_object<Body>();
3212 }
3213
3214 } // interface7
3215
3216 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3217     using interface7::reset_flags;
3218     using interface7::rf_reset_protocol;
3219     using interface7::rf_reset_bodies;
3220     using interface7::rf_extract;
3221 #endif
3222
3223     using interface7::graph;
3224     using interface7::graph_node;
3225     using interface7::continue_msg;
3226     using interface7::sender;
3227     using interface7::receiver;
3228     using interface7::continue_receiver;
3229
3230     using interface7::source_node;
3231     using interface7::function_node;
3232     using interface7::multifunction_node;
3233     using interface7::split_node;
3234     using interface7::internal::output_port;
3235     using interface7::indexer_node;
3236     using interface7::internal::tagged_msg;
3237     using interface7::internal::cast_to;
3238     using interface7::internal::is_a;
3239     using interface7::continue_node;
3240     using interface7::overwrite_node;
3241     using interface7::write_once_node;
3242     using interface7::broadcast_node;
3243     using interface7::buffer_node;
3244     using interface7::queue_node;
3245     using interface7::sequencer_node;
3246     using interface7::priority_queue_node;
3247     using interface7::limiter_node;
3248     using namespace interface7::internal::graph_policy_namespace;
3249     using interface7::join_node;
3250     using interface7::input_port;
3251     using interface7::copy_body;
3252     using interface7::make_edge;
3253     using interface7::remove_edge;
3254     using interface7::internal::NO_TAG;
3255     using interface7::internal::tag_value;
3256
3257 } // flow
3258 } // tbb
3259
3260 #undef __TBB_PFG_RESET_ARG
3261 #undef __TBB_COMMA
3262
3263 #endif // __TBB_flow_graph_H