2 Copyright 2005-2014 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5 you can redistribute it and/or modify it under the terms of the GNU General Public License
6 version 2 as published by the Free Software Foundation. Threading Building Blocks is
7 distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9 See the GNU General Public License for more details. You should have received a copy of
10 the GNU General Public License along with Threading Building Blocks; if not, write to the
11 Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
13 As a special exception, you may use this file as part of a free software library without
14 restriction. Specifically, if other files instantiate templates or use macros or inline
15 functions from this file, or you compile this file and link it with other files to produce
16 an executable, this file does not by itself cause the resulting executable to be covered
17 by the GNU General Public License. This exception does not however invalidate any other
18 reasons why the executable file might be covered by the GNU General Public License.
21 #ifndef __TBB_flow_graph_H
22 #define __TBB_flow_graph_H
24 #include "tbb_stddef.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
31 #include "cache_aligned_allocator.h"
32 #include "tbb_exception.h"
33 #include "internal/_aggregator_impl.h"
34 #include "tbb_profiling.h"
36 #if TBB_DEPRECATED_FLOW_ENQUEUE
37 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
39 #define FLOW_SPAWN(a) tbb::task::spawn((a))
42 // use the VC10 or gcc version of tuple if it is available.
43 #if __TBB_CPP11_TUPLE_PRESENT
48 using std::tuple_size;
49 using std::tuple_element;
54 #include "compat/tuple"
61 \brief The graph related classes and functions
63 There are some applications that best express dependencies as messages
64 passed between nodes in a graph. These messages may contain data or
65 simply act as signals that a predecessors has completed. The graph
66 class and its associated node classes can be used to express such
73 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
74 enum concurrency { unlimited = 0, serial = 1 };
76 namespace interface7 {
79 template<typename T, typename M> class successor_cache;
80 template<typename T, typename M> class broadcast_cache;
81 template<typename T, typename M> class round_robin_cache;
84 //! An empty class used for messages that mean "I'm done"
85 class continue_msg {};
87 template< typename T > class sender;
88 template< typename T > class receiver;
89 class continue_receiver;
91 //! Pure virtual template class that defines a sender of messages of type T
92 template< typename T >
95 //! The output type of this sender
96 typedef T output_type;
98 //! The successor type for this node
99 typedef receiver<T> successor_type;
103 //! Add a new successor to this node
104 virtual bool register_successor( successor_type &r ) = 0;
106 //! Removes a successor from this node
107 virtual bool remove_successor( successor_type &r ) = 0;
109 //! Request an item from the sender
110 virtual bool try_get( T & ) { return false; }
112 //! Reserves an item in the sender
113 virtual bool try_reserve( T & ) { return false; }
115 //! Releases the reserved item
116 virtual bool try_release( ) { return false; }
118 //! Consumes the reserved item
119 virtual bool try_consume( ) { return false; }
121 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
122 //! interface to record edges for traversal & deletion
123 virtual void internal_add_built_successor( successor_type & ) = 0;
124 virtual void internal_delete_built_successor( successor_type & ) = 0;
125 virtual void copy_successors( std::vector<successor_type *> &) = 0;
126 virtual size_t successor_count() = 0;
130 template< typename T > class limiter_node; // needed for resetting decrementer
131 template< typename R, typename B > class run_and_put_task;
133 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
135 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
136 // flags to modify the behavior of the graph reset(). Can be combined.
138 rf_reset_protocol = 0,
139 rf_reset_bodies = 1<<0, // delete the current node body, reset to a copy of the initial node body.
140 rf_extract = 1<<1 // delete edges (extract() for single node, reset() for graph.)
143 #define __TBB_PFG_RESET_ARG(exp) exp
144 #define __TBB_COMMA ,
146 #define __TBB_PFG_RESET_ARG(exp) /* nothing */
147 #define __TBB_COMMA /* nothing */
150 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
151 static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right) {
152 // if no RHS task, don't change left.
153 if(right == NULL) return left;
155 if(left == NULL) return right;
156 if(left == SUCCESSFULLY_ENQUEUED) return right;
157 // left contains a task
158 if(right != SUCCESSFULLY_ENQUEUED) {
159 // both are valid tasks
166 //! Pure virtual template class that defines a receiver of messages of type T
167 template< typename T >
170 //! The input type of this receiver
171 typedef T input_type;
173 //! The predecessor type for this node
174 typedef sender<T> predecessor_type;
177 virtual ~receiver() {}
179 //! Put an item to the receiver
180 bool try_put( const T& t ) {
181 task *res = try_put_task(t);
182 if(!res) return false;
183 if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
187 //! put item to successor; return task to run the successor if possible.
189 template< typename R, typename B > friend class run_and_put_task;
190 template<typename X, typename Y> friend class internal::broadcast_cache;
191 template<typename X, typename Y> friend class internal::round_robin_cache;
192 virtual task *try_put_task(const T& t) = 0;
195 //! Add a predecessor to the node
196 virtual bool register_predecessor( predecessor_type & ) { return false; }
198 //! Remove a predecessor from the node
199 virtual bool remove_predecessor( predecessor_type & ) { return false; }
201 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
202 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
203 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
204 virtual void copy_predecessors( std::vector<predecessor_type *> & ) = 0;
205 virtual size_t predecessor_count() = 0;
209 //! put receiver back in initial state
210 template<typename U> friend class limiter_node;
211 virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol ) ) = 0;
213 template<typename TT, typename M>
214 friend class internal::successor_cache;
215 virtual bool is_continue_receiver() { return false; }
218 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
219 //* holder of edges both for caches and for those nodes which do not have predecessor caches.
220 // C == receiver< ... > or sender< ... >, depending.
222 class edge_container {
225 typedef std::vector<C *> edge_vector;
227 void add_edge( C &s) {
228 built_edges.push_back( &s );
231 void delete_edge( C &s) {
232 for ( typename edge_vector::iterator i = built_edges.begin(); i != built_edges.end(); ++i ) {
234 (void)built_edges.erase(i);
235 return; // only remove one predecessor per request
240 void copy_edges( edge_vector &v) {
244 size_t edge_count() {
245 return (size_t)(built_edges.size());
252 template< typename S > void sender_extract( S &s );
253 template< typename R > void receiver_extract( R &r );
256 edge_vector built_edges;
258 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
260 //! Base class for receivers of completion messages
261 /** These receivers automatically reset, but cannot be explicitly waited on */
262 class continue_receiver : public receiver< continue_msg > {
266 typedef continue_msg input_type;
268 //! The predecessor type for this node
269 typedef sender< continue_msg > predecessor_type;
272 continue_receiver( int number_of_predecessors = 0 ) {
273 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
274 my_current_count = 0;
278 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
279 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
280 my_current_count = 0;
284 virtual ~continue_receiver() { }
286 //! Increments the trigger threshold
287 /* override */ bool register_predecessor( predecessor_type & ) {
288 spin_mutex::scoped_lock l(my_mutex);
289 ++my_predecessor_count;
293 //! Decrements the trigger threshold
294 /** Does not check to see if the removal of the predecessor now makes the current count
295 exceed the new threshold. So removing a predecessor while the graph is active can cause
296 unexpected results. */
297 /* override */ bool remove_predecessor( predecessor_type & ) {
298 spin_mutex::scoped_lock l(my_mutex);
299 --my_predecessor_count;
303 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
304 typedef std::vector<predecessor_type *> predecessor_vector_type;
306 /*override*/ void internal_add_built_predecessor( predecessor_type &s) {
307 spin_mutex::scoped_lock l(my_mutex);
308 my_built_predecessors.add_edge( s );
311 /*override*/ void internal_delete_built_predecessor( predecessor_type &s) {
312 spin_mutex::scoped_lock l(my_mutex);
313 my_built_predecessors.delete_edge(s);
316 /*override*/ void copy_predecessors( predecessor_vector_type &v) {
317 spin_mutex::scoped_lock l(my_mutex);
318 my_built_predecessors.copy_edges(v);
321 /*override*/ size_t predecessor_count() {
322 spin_mutex::scoped_lock l(my_mutex);
323 return my_built_predecessors.edge_count();
325 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
328 template< typename R, typename B > friend class run_and_put_task;
329 template<typename X, typename Y> friend class internal::broadcast_cache;
330 template<typename X, typename Y> friend class internal::round_robin_cache;
331 // execute body is supposed to be too small to create a task for.
332 /* override */ task *try_put_task( const input_type & ) {
334 spin_mutex::scoped_lock l(my_mutex);
335 if ( ++my_current_count < my_predecessor_count )
336 return SUCCESSFULLY_ENQUEUED;
338 my_current_count = 0;
340 task * res = execute();
341 if(!res) return SUCCESSFULLY_ENQUEUED;
345 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
346 edge_container<predecessor_type> my_built_predecessors;
349 int my_predecessor_count;
350 int my_current_count;
351 int my_initial_predecessor_count;
352 // the friend declaration in the base class did not eliminate the "protected class"
353 // error in gcc 4.1.2
354 template<typename U> friend class limiter_node;
355 /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) )
357 my_current_count = 0;
358 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
360 my_built_predecessors.receiver_extract(*this);
361 my_predecessor_count = my_initial_predecessor_count;
366 //! Does whatever should happen when the threshold is reached
367 /** This should be very fast or else spawn a task. This is
368 called while the sender is blocked in the try_put(). */
369 virtual task * execute() = 0;
370 template<typename TT, typename M>
371 friend class internal::successor_cache;
372 /*override*/ bool is_continue_receiver() { return true; }
378 #include "internal/_flow_graph_trace_impl.h"
382 namespace interface7 {
384 #include "internal/_flow_graph_types_impl.h"
385 #include "internal/_flow_graph_impl.h"
386 using namespace internal::graph_policy_namespace;
391 template <typename GraphContainerType, typename GraphNodeType>
392 class graph_iterator {
394 friend class graph_node;
396 typedef size_t size_type;
397 typedef GraphNodeType value_type;
398 typedef GraphNodeType* pointer;
399 typedef GraphNodeType& reference;
400 typedef const GraphNodeType& const_reference;
401 typedef std::forward_iterator_tag iterator_category;
403 //! Default constructor
404 graph_iterator() : my_graph(NULL), current_node(NULL) {}
407 graph_iterator(const graph_iterator& other) :
408 my_graph(other.my_graph), current_node(other.current_node)
412 graph_iterator& operator=(const graph_iterator& other) {
413 if (this != &other) {
414 my_graph = other.my_graph;
415 current_node = other.current_node;
421 reference operator*() const;
424 pointer operator->() const;
427 bool operator==(const graph_iterator& other) const {
428 return ((my_graph == other.my_graph) && (current_node == other.current_node));
432 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
435 graph_iterator& operator++() {
441 graph_iterator operator++(int) {
442 graph_iterator result = *this;
448 // the graph over which we are iterating
449 GraphContainerType *my_graph;
450 // pointer into my_graph's my_nodes list
451 pointer current_node;
453 //! Private initializing constructor for begin() and end() iterators
454 graph_iterator(GraphContainerType *g, bool begin);
455 void internal_forward();
459 /** This class serves as a handle to the graph */
460 class graph : tbb::internal::no_copy {
461 friend class graph_node;
463 template< typename Body >
464 class run_task : public task {
466 run_task( Body& body ) : my_body(body) {}
475 template< typename Receiver, typename Body >
476 class run_and_put_task : public task {
478 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
480 task *res = my_receiver.try_put_task( my_body() );
481 if(res == SUCCESSFULLY_ENQUEUED) res = NULL;
485 Receiver &my_receiver;
490 //! Constructs a graph with isolated task_group_context
491 explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
495 caught_exception = false;
496 my_context = new task_group_context();
497 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
498 my_root_task->set_ref_count(1);
499 tbb::internal::fgt_graph( this );
500 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
505 //! Constructs a graph with use_this_context as context
506 explicit graph(task_group_context& use_this_context) :
507 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
510 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
511 my_root_task->set_ref_count(1);
512 tbb::internal::fgt_graph( this );
513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
518 //! Destroys the graph.
519 /** Calls wait_for_all, then destroys the root task and context. */
522 my_root_task->set_ref_count(0);
523 task::destroy( *my_root_task );
524 if (own_context) delete my_context;
527 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
528 void set_name( const char *name ) {
529 tbb::internal::fgt_graph_desc( this, name );
533 //! Used to register that an external entity may still interact with the graph.
534 /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
536 void increment_wait_count() {
538 my_root_task->increment_ref_count();
541 //! Deregisters an external entity that may have interacted with the graph.
542 /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
543 matches the number of increment_wait_count calls. */
544 void decrement_wait_count() {
546 my_root_task->decrement_ref_count();
549 //! Spawns a task that runs a body and puts its output to a specific receiver
550 /** The task is spawned as a child of the graph. This is useful for running tasks
551 that need to block a wait_for_all() on the graph. For example a one-off source. */
552 template< typename Receiver, typename Body >
553 void run( Receiver &r, Body body ) {
554 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_root_task ) )
555 run_and_put_task< Receiver, Body >( r, body )) );
558 //! Spawns a task that runs a function object
559 /** The task is spawned as a child of the graph. This is useful for running tasks
560 that need to block a wait_for_all() on the graph. For example a one-off source. */
561 template< typename Body >
562 void run( Body body ) {
563 FLOW_SPAWN( * new ( task::allocate_additional_child_of( *my_root_task ) ) run_task< Body >( body ) );
566 //! Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
567 /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
568 void wait_for_all() {
570 caught_exception = false;
572 #if TBB_USE_EXCEPTIONS
575 my_root_task->wait_for_all();
576 cancelled = my_context->is_group_execution_cancelled();
577 #if TBB_USE_EXCEPTIONS
580 my_root_task->set_ref_count(1);
582 caught_exception = true;
587 my_context->reset(); // consistent with behavior in catch()
588 my_root_task->set_ref_count(1);
592 //! Returns the root task of the graph
594 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
602 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
603 void set_active(bool a = true) {
613 template<typename C, typename N>
614 friend class graph_iterator;
616 // Graph iterator typedefs
617 typedef graph_iterator<graph,graph_node> iterator;
618 typedef graph_iterator<const graph,const graph_node> const_iterator;
620 // Graph iterator constructors
622 iterator begin() { return iterator(this, true); }
624 iterator end() { return iterator(this, false); }
625 //! start const iterator
626 const_iterator begin() const { return const_iterator(this, true); }
627 //! end const iterator
628 const_iterator end() const { return const_iterator(this, false); }
629 //! start const iterator
630 const_iterator cbegin() const { return const_iterator(this, true); }
631 //! end const iterator
632 const_iterator cend() const { return const_iterator(this, false); }
634 //! return status of graph execution
635 bool is_cancelled() { return cancelled; }
636 bool exception_thrown() { return caught_exception; }
638 // thread-unsafe state reset.
639 void reset(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol));
643 task_group_context *my_context;
646 bool caught_exception;
647 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
652 graph_node *my_nodes, *my_nodes_last;
654 spin_mutex nodelist_mutex;
655 void register_node(graph_node *n);
656 void remove_node(graph_node *n);
660 template <typename C, typename N>
661 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
663 if (begin) current_node = my_graph->my_nodes;
664 //else it is an end iterator by default
667 template <typename C, typename N>
668 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
669 __TBB_ASSERT(current_node, "graph_iterator at end");
670 return *operator->();
673 template <typename C, typename N>
674 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
679 template <typename C, typename N>
680 void graph_iterator<C,N>::internal_forward() {
681 if (current_node) current_node = current_node->next;
684 //! The base of all graph nodes.
685 class graph_node : tbb::internal::no_assign {
687 template<typename C, typename N>
688 friend class graph_iterator;
691 graph_node *next, *prev;
693 graph_node(graph& g) : my_graph(g) {
694 my_graph.register_node(this);
696 virtual ~graph_node() {
697 my_graph.remove_node(this);
700 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
701 virtual void set_name( const char *name ) = 0;
704 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
705 virtual void extract( reset_flags f=rf_extract ) {
706 bool a = my_graph.is_active();
707 my_graph.set_active(false);
708 reset((reset_flags)(f|rf_extract));
709 my_graph.set_active(a);
714 virtual void reset(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_protocol)) = 0;
717 inline void graph::register_node(graph_node *n) {
720 spin_mutex::scoped_lock lock(nodelist_mutex);
721 n->prev = my_nodes_last;
722 if (my_nodes_last) my_nodes_last->next = n;
724 if (!my_nodes) my_nodes = n;
728 inline void graph::remove_node(graph_node *n) {
730 spin_mutex::scoped_lock lock(nodelist_mutex);
731 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
732 if (n->prev) n->prev->next = n->next;
733 if (n->next) n->next->prev = n->prev;
734 if (my_nodes_last == n) my_nodes_last = n->prev;
735 if (my_nodes == n) my_nodes = n->next;
737 n->prev = n->next = NULL;
740 inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) {
742 task *saved_my_root_task = my_root_task;
744 if(my_context) my_context->reset();
746 caught_exception = false;
747 // reset all the nodes comprising the graph
748 for(iterator ii = begin(); ii != end(); ++ii) {
749 graph_node *my_p = &(*ii);
750 my_p->reset(__TBB_PFG_RESET_ARG(f));
752 my_root_task = saved_my_root_task;
756 #include "internal/_flow_graph_node_impl.h"
758 //! An executable node that acts as a source, i.e. it has no predecessors
759 template < typename Output >
760 class source_node : public graph_node, public sender< Output > {
762 using graph_node::my_graph;
764 //! The type of the output message, which is complete
765 typedef Output output_type;
767 //! The type of successors of this node
768 typedef receiver< Output > successor_type;
770 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
771 typedef std::vector<successor_type *> successor_vector_type;
774 //! Constructor for a node with a successor
775 template< typename Body >
776 source_node( graph &g, Body body, bool is_active = true )
777 : graph_node(g), my_active(is_active), init_my_active(is_active),
778 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
779 my_reserved(false), my_has_cached_item(false)
781 my_successors.set_owner(this);
782 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
783 static_cast<sender<output_type> *>(this), this->my_body );
787 source_node( const source_node& src ) :
788 graph_node(src.my_graph), sender<Output>(),
789 my_active(src.init_my_active),
790 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
791 my_reserved(false), my_has_cached_item(false)
793 my_successors.set_owner(this);
794 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
795 static_cast<sender<output_type> *>(this), this->my_body );
799 ~source_node() { delete my_body; }
801 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
802 /* override */ void set_name( const char *name ) {
803 tbb::internal::fgt_node_desc( this, name );
807 //! Add a new successor to this node
808 /* override */ bool register_successor( successor_type &r ) {
809 spin_mutex::scoped_lock lock(my_mutex);
810 my_successors.register_successor(r);
816 //! Removes a successor from this node
817 /* override */ bool remove_successor( successor_type &r ) {
818 spin_mutex::scoped_lock lock(my_mutex);
819 my_successors.remove_successor(r);
823 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
824 /*override*/void internal_add_built_successor( successor_type &r) {
825 spin_mutex::scoped_lock lock(my_mutex);
826 my_successors.internal_add_built_successor(r);
829 /*override*/void internal_delete_built_successor( successor_type &r) {
830 spin_mutex::scoped_lock lock(my_mutex);
831 my_successors.internal_delete_built_successor(r);
834 /*override*/size_t successor_count() {
835 spin_mutex::scoped_lock lock(my_mutex);
836 return my_successors.successor_count();
839 /*override*/void copy_successors(successor_vector_type &v) {
840 spin_mutex::scoped_lock l(my_mutex);
841 my_successors.copy_successors(v);
843 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
845 //! Request an item from the node
846 /*override */ bool try_get( output_type &v ) {
847 spin_mutex::scoped_lock lock(my_mutex);
851 if ( my_has_cached_item ) {
853 my_has_cached_item = false;
856 // we've been asked to provide an item, but we have none. enqueue a task to
862 //! Reserves an item.
863 /* override */ bool try_reserve( output_type &v ) {
864 spin_mutex::scoped_lock lock(my_mutex);
869 if ( my_has_cached_item ) {
878 //! Release a reserved item.
879 /** true = item has been released and so remains in sender, dest must request or reserve future items */
880 /* override */ bool try_release( ) {
881 spin_mutex::scoped_lock lock(my_mutex);
882 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
884 if(!my_successors.empty())
889 //! Consumes a reserved item
890 /* override */ bool try_consume( ) {
891 spin_mutex::scoped_lock lock(my_mutex);
892 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
894 my_has_cached_item = false;
895 if ( !my_successors.empty() ) {
901 //! Activates a node that was created in the inactive state
903 spin_mutex::scoped_lock lock(my_mutex);
905 if ( !my_successors.empty() )
909 template<typename Body>
910 Body copy_function_object() {
911 internal::source_body<output_type> &body_ref = *this->my_body;
912 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
917 //! resets the source_node to its initial state
918 void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
919 my_active = init_my_active;
921 if(my_has_cached_item) {
922 my_has_cached_item = false;
924 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
925 my_successors.reset(f);
926 if(f & rf_reset_bodies) my_body->reset_body();
934 internal::source_body<output_type> *my_body;
935 internal::broadcast_cache< output_type > my_successors;
937 bool my_has_cached_item;
938 output_type my_cached_item;
940 // used by apply_body, can invoke body of node.
941 bool try_reserve_apply_body(output_type &v) {
942 spin_mutex::scoped_lock lock(my_mutex);
946 if ( !my_has_cached_item ) {
947 tbb::internal::fgt_begin_body( my_body );
948 bool r = (*my_body)(my_cached_item);
949 tbb::internal::fgt_end_body( my_body );
951 my_has_cached_item = true;
954 if ( my_has_cached_item ) {
963 //! Spawns a task that applies the body
964 /* override */ void spawn_put( ) {
965 task* tp = this->my_graph.root_task();
967 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
968 internal:: source_task_bypass < source_node< output_type > >( *this ) ) );
972 friend class internal::source_task_bypass< source_node< output_type > >;
973 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
974 /* override */ task * apply_body_bypass( ) {
976 if ( !try_reserve_apply_body(v) )
979 task *last_task = my_successors.try_put_task(v);
988 //! Implements a function node that supports Input -> Output
989 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
990 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
992 using graph_node::my_graph;
994 typedef Input input_type;
995 typedef Output output_type;
996 typedef sender< input_type > predecessor_type;
997 typedef receiver< output_type > successor_type;
998 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
999 typedef internal::function_output<output_type> fOutput_type;
1000 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1001 using typename internal::function_input<Input,Output,Allocator>::predecessor_vector_type;
1002 using typename internal::function_output<Output>::successor_vector_type;
1006 template< typename Body >
1007 function_node( graph &g, size_t concurrency, Body body ) :
1008 graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body) {
1009 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1010 static_cast<sender<output_type> *>(this), this->my_body );
1013 //! Copy constructor
1014 function_node( const function_node& src ) :
1015 graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
1017 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph, static_cast<receiver<input_type> *>(this),
1018 static_cast<sender<output_type> *>(this), this->my_body );
1021 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1022 /* override */ void set_name( const char *name ) {
1023 tbb::internal::fgt_node_desc( this, name );
1028 template< typename R, typename B > friend class run_and_put_task;
1029 template<typename X, typename Y> friend class internal::broadcast_cache;
1030 template<typename X, typename Y> friend class internal::round_robin_cache;
1031 using fInput_type::try_put_task;
1033 // override of graph_node's reset.
1034 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1035 fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1036 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1037 successors().reset(f);
1038 __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1039 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1043 /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1046 //! Implements a function node that supports Input -> Output
1047 template < typename Input, typename Output, typename Allocator >
1048 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
1050 using graph_node::my_graph;
1052 typedef Input input_type;
1053 typedef Output output_type;
1054 typedef sender< input_type > predecessor_type;
1055 typedef receiver< output_type > successor_type;
1056 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
1057 typedef internal::function_input_queue<input_type, Allocator> queue_type;
1058 typedef internal::function_output<output_type> fOutput_type;
1059 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1060 using typename internal::function_input<Input,Output,Allocator>::predecessor_vector_type;
1061 using typename internal::function_output<Output>::successor_vector_type;
1065 template< typename Body >
1066 function_node( graph &g, size_t concurrency, Body body ) :
1067 graph_node(g), fInput_type( g, concurrency, body, new queue_type() ) {
1068 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1069 static_cast<sender<output_type> *>(this), this->my_body );
1072 //! Copy constructor
1073 function_node( const function_node& src ) :
1074 graph_node(src.graph_node::my_graph), fInput_type( src, new queue_type() ), fOutput_type() {
1075 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1076 static_cast<sender<output_type> *>(this), this->my_body );
1079 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1080 /* override */ void set_name( const char *name ) {
1081 tbb::internal::fgt_node_desc( this, name );
1086 template< typename R, typename B > friend class run_and_put_task;
1087 template<typename X, typename Y> friend class internal::broadcast_cache;
1088 template<typename X, typename Y> friend class internal::round_robin_cache;
1089 using fInput_type::try_put_task;
1091 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1092 fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1093 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1094 successors().reset(f);
1095 __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1096 __TBB_ASSERT(!(f & rf_extract) || this->my_predecessors.empty(), "function_node predecessors not empty");
1101 /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1104 //! implements a function node that supports Input -> (set of outputs)
1105 // Output is a tuple of output types.
1106 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1107 class multifunction_node :
1109 public internal::multifunction_input
1112 typename internal::wrap_tuple_elements<
1113 tbb::flow::tuple_size<Output>::value, // #elements in tuple
1114 internal::multifunction_output, // wrap this around each element
1115 Output // the tuple providing the types
1120 using graph_node::my_graph;
1122 static const int N = tbb::flow::tuple_size<Output>::value;
1124 typedef Input input_type;
1125 typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1127 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1128 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1130 template<typename Body>
1131 multifunction_node( graph &g, size_t concurrency, Body body ) :
1132 graph_node(g), base_type(g,concurrency, body) {
1133 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1134 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1135 this->output_ports(), this->my_body );
1138 multifunction_node( const multifunction_node &other) :
1139 graph_node(other.graph_node::my_graph), base_type(other) {
1140 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1141 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1142 this->output_ports(), this->my_body );
1145 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1146 /* override */ void set_name( const char *name ) {
1147 tbb::internal::fgt_multioutput_node_desc( this, name );
1151 // all the guts are in multifunction_input...
1153 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1154 }; // multifunction_node
1156 template < typename Input, typename Output, typename Allocator >
1157 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
1158 typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::value, internal::multifunction_output, Output>::type, Allocator> {
1160 using graph_node::my_graph;
1161 static const int N = tbb::flow::tuple_size<Output>::value;
1163 typedef Input input_type;
1164 typedef typename internal::wrap_tuple_elements<N, internal::multifunction_output, Output>::type output_ports_type;
1166 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1167 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1169 template<typename Body>
1170 multifunction_node( graph &g, size_t concurrency, Body body) :
1171 graph_node(g), base_type(g,concurrency, body, new queue_type()) {
1172 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1173 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1174 this->output_ports(), this->my_body );
1177 multifunction_node( const multifunction_node &other) :
1178 graph_node(other.graph_node::my_graph), base_type(other, new queue_type()) {
1179 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1180 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1181 this->output_ports(), this->my_body );
1184 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1185 /* override */ void set_name( const char *name ) {
1186 tbb::internal::fgt_multioutput_node_desc( this, name );
1190 // all the guts are in multifunction_input...
1192 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1193 }; // multifunction_node
1195 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1196 // successors. The node has unlimited concurrency, so though it is marked as
1197 // "rejecting" it does not reject inputs.
1198 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1199 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
1200 static const int N = tbb::flow::tuple_size<TupleType>::value;
1201 typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
1203 typedef typename base_type::output_ports_type output_ports_type;
1205 struct splitting_body {
1206 void operator()(const TupleType& t, output_ports_type &p) {
1207 internal::emit_element<N>::emit_this(t, p);
1211 typedef TupleType input_type;
1212 typedef Allocator allocator_type;
1213 split_node(graph &g) : base_type(g, unlimited, splitting_body()) {
1214 tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1215 static_cast<receiver<input_type> *>(this), this->output_ports() );
1218 split_node( const split_node & other) : base_type(other) {
1219 tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1220 static_cast<receiver<input_type> *>(this), this->output_ports() );
1223 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1224 /* override */ void set_name( const char *name ) {
1225 tbb::internal::fgt_multioutput_node_desc( this, name );
1231 //! Implements an executable node that supports continue_msg -> Output
1232 template <typename Output>
1233 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
1235 using graph_node::my_graph;
1237 typedef continue_msg input_type;
1238 typedef Output output_type;
1239 typedef sender< input_type > predecessor_type;
1240 typedef receiver< output_type > successor_type;
1241 typedef internal::continue_input<Output> fInput_type;
1242 typedef internal::function_output<output_type> fOutput_type;
1244 //! Constructor for executable node with continue_msg -> Output
1245 template <typename Body >
1246 continue_node( graph &g, Body body ) :
1247 graph_node(g), internal::continue_input<output_type>( g, body ) {
1248 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1249 static_cast<receiver<input_type> *>(this),
1250 static_cast<sender<output_type> *>(this), this->my_body );
1254 //! Constructor for executable node with continue_msg -> Output
1255 template <typename Body >
1256 continue_node( graph &g, int number_of_predecessors, Body body ) :
1257 graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1258 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1259 static_cast<receiver<input_type> *>(this),
1260 static_cast<sender<output_type> *>(this), this->my_body );
1263 //! Copy constructor
1264 continue_node( const continue_node& src ) :
1265 graph_node(src.graph_node::my_graph), internal::continue_input<output_type>(src),
1266 internal::function_output<Output>() {
1267 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1268 static_cast<receiver<input_type> *>(this),
1269 static_cast<sender<output_type> *>(this), this->my_body );
1272 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1273 /* override */ void set_name( const char *name ) {
1274 tbb::internal::fgt_node_desc( this, name );
1279 template< typename R, typename B > friend class run_and_put_task;
1280 template<typename X, typename Y> friend class internal::broadcast_cache;
1281 template<typename X, typename Y> friend class internal::round_robin_cache;
1282 using fInput_type::try_put_task;
1284 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1285 fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f));
1286 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1287 successors().reset(f);
1288 __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "continue_node not reset");
1292 /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1295 template< typename T >
1296 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
1298 using graph_node::my_graph;
1300 typedef T input_type;
1301 typedef T output_type;
1302 typedef sender< input_type > predecessor_type;
1303 typedef receiver< output_type > successor_type;
1304 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1305 typedef std::vector<predecessor_type *> predecessor_vector_type;
1306 typedef std::vector<successor_type *> successor_vector_type;
1309 overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
1310 my_successors.set_owner( this );
1311 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1312 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1315 // Copy constructor; doesn't take anything from src; default won't work
1316 overwrite_node( const overwrite_node& src ) :
1317 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
1319 my_successors.set_owner( this );
1320 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1321 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1324 ~overwrite_node() {}
1326 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1327 /* override */ void set_name( const char *name ) {
1328 tbb::internal::fgt_node_desc( this, name );
1332 /* override */ bool register_successor( successor_type &s ) {
1333 spin_mutex::scoped_lock l( my_mutex );
1334 task* tp = this->my_graph.root_task(); // just to test if we are resetting
1335 if (my_buffer_is_valid && tp) {
1336 // We have a valid value that must be forwarded immediately.
1337 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
1338 // We add the successor: it accepted our put or it rejected it but won't let us become a predecessor
1339 my_successors.register_successor( s );
1341 // We don't add the successor: it rejected our put and we became its predecessor instead
1345 // No valid value yet, just add as successor
1346 my_successors.register_successor( s );
1351 /* override */ bool remove_successor( successor_type &s ) {
1352 spin_mutex::scoped_lock l( my_mutex );
1353 my_successors.remove_successor(s);
1357 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1358 /*override*/void internal_add_built_successor( successor_type &s) {
1359 spin_mutex::scoped_lock l( my_mutex );
1360 my_successors.internal_add_built_successor(s);
1363 /*override*/void internal_delete_built_successor( successor_type &s) {
1364 spin_mutex::scoped_lock l( my_mutex );
1365 my_successors.internal_delete_built_successor(s);
1368 /*override*/size_t successor_count() {
1369 spin_mutex::scoped_lock l( my_mutex );
1370 return my_successors.successor_count();
1373 /*override*/ void copy_successors(successor_vector_type &v) {
1374 spin_mutex::scoped_lock l( my_mutex );
1375 my_successors.copy_successors(v);
1378 /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1379 spin_mutex::scoped_lock l( my_mutex );
1380 my_built_predecessors.add_edge(p);
1383 /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1384 spin_mutex::scoped_lock l( my_mutex );
1385 my_built_predecessors.delete_edge(p);
1388 /*override*/size_t predecessor_count() {
1389 spin_mutex::scoped_lock l( my_mutex );
1390 return my_built_predecessors.edge_count();
1393 /*override*/void copy_predecessors(predecessor_vector_type &v) {
1394 spin_mutex::scoped_lock l( my_mutex );
1395 my_built_predecessors.copy_edges(v);
1397 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1399 /* override */ bool try_get( input_type &v ) {
1400 spin_mutex::scoped_lock l( my_mutex );
1401 if ( my_buffer_is_valid ) {
1409 spin_mutex::scoped_lock l( my_mutex );
1410 return my_buffer_is_valid;
1414 spin_mutex::scoped_lock l( my_mutex );
1415 my_buffer_is_valid = false;
1419 template< typename R, typename B > friend class run_and_put_task;
1420 template<typename X, typename Y> friend class internal::broadcast_cache;
1421 template<typename X, typename Y> friend class internal::round_robin_cache;
1422 /* override */ task * try_put_task( const input_type &v ) {
1423 spin_mutex::scoped_lock l( my_mutex );
1425 my_buffer_is_valid = true;
1426 task * rtask = my_successors.try_put_task(v);
1427 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
1431 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1432 my_buffer_is_valid = false;
1433 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1434 my_successors.reset(f);
1436 my_built_predecessors.receiver_extract(*this);
1441 spin_mutex my_mutex;
1442 internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
1443 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1444 edge_container<sender<input_type> > my_built_predecessors;
1446 input_type my_buffer;
1447 bool my_buffer_is_valid;
1448 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1449 }; // overwrite_node
1451 template< typename T >
1452 class write_once_node : public overwrite_node<T> {
1454 typedef T input_type;
1455 typedef T output_type;
1456 typedef sender< input_type > predecessor_type;
1457 typedef receiver< output_type > successor_type;
1460 write_once_node(graph& g) : overwrite_node<T>(g) {
1461 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1462 static_cast<receiver<input_type> *>(this),
1463 static_cast<sender<output_type> *>(this) );
1466 //! Copy constructor: call base class copy constructor
1467 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {
1468 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1469 static_cast<receiver<input_type> *>(this),
1470 static_cast<sender<output_type> *>(this) );
1473 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1474 /* override */ void set_name( const char *name ) {
1475 tbb::internal::fgt_node_desc( this, name );
1480 template< typename R, typename B > friend class run_and_put_task;
1481 template<typename X, typename Y> friend class internal::broadcast_cache;
1482 template<typename X, typename Y> friend class internal::round_robin_cache;
1483 /* override */ task *try_put_task( const T &v ) {
1484 spin_mutex::scoped_lock l( this->my_mutex );
1485 if ( this->my_buffer_is_valid ) {
1488 this->my_buffer = v;
1489 this->my_buffer_is_valid = true;
1490 task *res = this->my_successors.try_put_task(v);
1491 if(!res) res = SUCCESSFULLY_ENQUEUED;
1497 //! Forwards messages of type T to all successors
1498 template <typename T>
1499 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1501 using graph_node::my_graph;
1503 typedef T input_type;
1504 typedef T output_type;
1505 typedef sender< input_type > predecessor_type;
1506 typedef receiver< output_type > successor_type;
1507 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1508 typedef std::vector<predecessor_type *> predecessor_vector_type;
1509 typedef std::vector<successor_type *> successor_vector_type;
1512 internal::broadcast_cache<input_type> my_successors;
1513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1514 edge_container<predecessor_type> my_built_predecessors;
1515 spin_mutex pred_mutex;
1519 broadcast_node(graph& g) : graph_node(g) {
1520 my_successors.set_owner( this );
1521 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1522 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1526 broadcast_node( const broadcast_node& src ) :
1527 graph_node(src.my_graph), receiver<T>(), sender<T>()
1529 my_successors.set_owner( this );
1530 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1531 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1534 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1535 /* override */ void set_name( const char *name ) {
1536 tbb::internal::fgt_node_desc( this, name );
1540 //! Adds a successor
1541 virtual bool register_successor( receiver<T> &r ) {
1542 my_successors.register_successor( r );
1546 //! Removes s as a successor
1547 virtual bool remove_successor( receiver<T> &r ) {
1548 my_successors.remove_successor( r );
1552 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1553 /*override*/ void internal_add_built_successor(successor_type &r) {
1554 my_successors.internal_add_built_successor(r);
1557 /*override*/ void internal_delete_built_successor(successor_type &r) {
1558 my_successors.internal_delete_built_successor(r);
1561 /*override*/ size_t successor_count() {
1562 return my_successors.successor_count();
1565 /*override*/ void copy_successors(successor_vector_type &v) {
1566 my_successors.copy_successors(v);
1569 /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1570 my_built_predecessors.add_edge(p);
1573 /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1574 my_built_predecessors.delete_edge(p);
1577 /*override*/ size_t predecessor_count() {
1578 return my_built_predecessors.edge_count();
1581 /*override*/ void copy_predecessors(predecessor_vector_type &v) {
1582 my_built_predecessors.copy_edges(v);
1584 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1587 template< typename R, typename B > friend class run_and_put_task;
1588 template<typename X, typename Y> friend class internal::broadcast_cache;
1589 template<typename X, typename Y> friend class internal::round_robin_cache;
1590 //! build a task to run the successor if possible. Default is old behavior.
1591 /*override*/ task *try_put_task(const T& t) {
1592 task *new_task = my_successors.try_put_task(t);
1593 if(!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1597 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1598 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1599 my_successors.reset(f);
1601 my_built_predecessors.receiver_extract(*this);
1603 __TBB_ASSERT(!(f & rf_extract) || my_successors.empty(), "Error resetting broadcast_node");
1606 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1607 }; // broadcast_node
1609 //! Forwards messages in arbitrary order
1610 template <typename T, typename A=cache_aligned_allocator<T> >
1611 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1613 using graph_node::my_graph;
1615 typedef T input_type;
1616 typedef T output_type;
1617 typedef sender< input_type > predecessor_type;
1618 typedef receiver< output_type > successor_type;
1619 typedef buffer_node<T, A> my_class;
1620 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1621 typedef std::vector<predecessor_type *> predecessor_vector_type;
1622 typedef std::vector<successor_type *> successor_vector_type;
1625 typedef size_t size_type;
1626 internal::round_robin_cache< T, null_rw_mutex > my_successors;
1628 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1629 edge_container<predecessor_type> my_built_predecessors;
1632 friend class internal::forward_task_bypass< buffer_node< T, A > >;
1634 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1635 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1636 , add_blt_succ, del_blt_succ,
1637 add_blt_pred, del_blt_pred,
1638 blt_succ_cnt, blt_pred_cnt,
1639 blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1642 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1644 // implements the aggregator_operation concept
1645 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1648 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1653 predecessor_type *p;
1655 successor_vector_type *svec;
1656 predecessor_vector_type *pvec;
1663 buffer_operation(const T& e, op_type t) : type(char(t))
1665 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1666 , ltask(NULL), elem(const_cast<T*>(&e))
1668 , elem(const_cast<T*>(&e)) , ltask(NULL)
1671 buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1674 bool forwarder_busy;
1675 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
1676 friend class internal::aggregating_functor<my_class, buffer_operation>;
1677 internal::aggregator< my_handler, buffer_operation> my_aggregator;
1679 virtual void handle_operations(buffer_operation *op_list) {
1680 buffer_operation *tmp = NULL;
1681 bool try_forwarding=false;
1684 op_list = op_list->next;
1685 switch (tmp->type) {
1686 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1687 case rem_succ: internal_rem_succ(tmp); break;
1688 case req_item: internal_pop(tmp); break;
1689 case res_item: internal_reserve(tmp); break;
1690 case rel_res: internal_release(tmp); try_forwarding = true; break;
1691 case con_res: internal_consume(tmp); try_forwarding = true; break;
1692 case put_item: internal_push(tmp); try_forwarding = (tmp->status == SUCCEEDED); break;
1693 case try_fwd_task: internal_forward_task(tmp); break;
1694 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1696 case add_blt_succ: internal_add_built_succ(tmp); break;
1697 case del_blt_succ: internal_del_built_succ(tmp); break;
1698 case add_blt_pred: internal_add_built_pred(tmp); break;
1699 case del_blt_pred: internal_del_built_pred(tmp); break;
1700 case blt_succ_cnt: internal_succ_cnt(tmp); break;
1701 case blt_pred_cnt: internal_pred_cnt(tmp); break;
1702 case blt_succ_cpy: internal_copy_succs(tmp); break;
1703 case blt_pred_cpy: internal_copy_preds(tmp); break;
1707 if (try_forwarding && !forwarder_busy) {
1708 task* tp = this->my_graph.root_task();
1710 forwarder_busy = true;
1711 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
1713 < buffer_node<input_type, A> >(*this);
1714 // tmp should point to the last item handled by the aggregator. This is the operation
1715 // the handling thread enqueued. So modifying that record will be okay.
1716 tbb::task *z = tmp->ltask;
1717 tmp->ltask = combine_tasks(z, new_task); // in case the op generated a task
1722 inline task *grab_forwarding_task( buffer_operation &op_data) {
1723 return op_data.ltask;
1726 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1727 task *ft = grab_forwarding_task(op_data);
1735 //! This is executed by an enqueued task, the "forwarder"
1736 virtual task *forward_task() {
1737 buffer_operation op_data(try_fwd_task);
1738 task *last_task = NULL;
1740 op_data.status = WAIT;
1741 op_data.ltask = NULL;
1742 my_aggregator.execute(&op_data);
1743 tbb::task *xtask = op_data.ltask;
1744 last_task = combine_tasks(last_task, xtask);
1745 } while (op_data.status == SUCCEEDED);
1749 //! Register successor
1750 virtual void internal_reg_succ(buffer_operation *op) {
1751 my_successors.register_successor(*(op->r));
1752 __TBB_store_with_release(op->status, SUCCEEDED);
1755 //! Remove successor
1756 virtual void internal_rem_succ(buffer_operation *op) {
1757 my_successors.remove_successor(*(op->r));
1758 __TBB_store_with_release(op->status, SUCCEEDED);
1761 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1762 virtual void internal_add_built_succ(buffer_operation *op) {
1763 my_successors.internal_add_built_successor(*(op->r));
1764 __TBB_store_with_release(op->status, SUCCEEDED);
1767 virtual void internal_del_built_succ(buffer_operation *op) {
1768 my_successors.internal_delete_built_successor(*(op->r));
1769 __TBB_store_with_release(op->status, SUCCEEDED);
1772 virtual void internal_add_built_pred(buffer_operation *op) {
1773 my_built_predecessors.add_edge(*(op->p));
1774 __TBB_store_with_release(op->status, SUCCEEDED);
1777 virtual void internal_del_built_pred(buffer_operation *op) {
1778 my_built_predecessors.delete_edge(*(op->p));
1779 __TBB_store_with_release(op->status, SUCCEEDED);
1782 virtual void internal_succ_cnt(buffer_operation *op) {
1783 op->cnt_val = my_successors.successor_count();
1784 __TBB_store_with_release(op->status, SUCCEEDED);
1787 virtual void internal_pred_cnt(buffer_operation *op) {
1788 op->cnt_val = my_built_predecessors.edge_count();
1789 __TBB_store_with_release(op->status, SUCCEEDED);
1792 virtual void internal_copy_succs(buffer_operation *op) {
1793 my_successors.copy_successors(*(op->svec));
1794 __TBB_store_with_release(op->status, SUCCEEDED);
1797 virtual void internal_copy_preds(buffer_operation *op) {
1798 my_built_predecessors.copy_edges(*(op->pvec));
1799 __TBB_store_with_release(op->status, SUCCEEDED);
1801 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1803 //! Tries to forward valid items to successors
1804 virtual void internal_forward_task(buffer_operation *op) {
1805 if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) {
1806 __TBB_store_with_release(op->status, FAILED);
1807 this->forwarder_busy = false;
1811 task * last_task = NULL;
1812 size_type counter = my_successors.size();
1813 // Try forwarding, giving each successor a chance
1814 while (counter>0 && !this->buffer_empty() && this->my_item_valid(this->my_tail-1)) {
1815 this->copy_back(i_copy);
1816 task *new_task = my_successors.try_put_task(i_copy);
1818 last_task = combine_tasks(last_task, new_task);
1819 this->destroy_back();
1823 op->ltask = last_task; // return task
1824 if (last_task && !counter) {
1825 __TBB_store_with_release(op->status, SUCCEEDED);
1828 __TBB_store_with_release(op->status, FAILED);
1829 forwarder_busy = false;
1833 virtual void internal_push(buffer_operation *op) {
1834 this->push_back(*(op->elem));
1835 __TBB_store_with_release(op->status, SUCCEEDED);
1838 virtual void internal_pop(buffer_operation *op) {
1839 if(this->pop_back(*(op->elem))) {
1840 __TBB_store_with_release(op->status, SUCCEEDED);
1843 __TBB_store_with_release(op->status, FAILED);
1847 virtual void internal_reserve(buffer_operation *op) {
1848 if(this->reserve_front(*(op->elem))) {
1849 __TBB_store_with_release(op->status, SUCCEEDED);
1852 __TBB_store_with_release(op->status, FAILED);
1856 virtual void internal_consume(buffer_operation *op) {
1857 this->consume_front();
1858 __TBB_store_with_release(op->status, SUCCEEDED);
1861 virtual void internal_release(buffer_operation *op) {
1862 this->release_front();
1863 __TBB_store_with_release(op->status, SUCCEEDED);
1868 buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1869 forwarder_busy(false) {
1870 my_successors.set_owner(this);
1871 my_aggregator.initialize_handler(my_handler(this));
1872 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1873 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1876 //! Copy constructor
1877 buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
1878 internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1879 forwarder_busy = false;
1880 my_successors.set_owner(this);
1881 my_aggregator.initialize_handler(my_handler(this));
1882 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1883 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1886 virtual ~buffer_node() {}
1888 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1889 /* override */ void set_name( const char *name ) {
1890 tbb::internal::fgt_node_desc( this, name );
1895 // message sender implementation
1898 //! Adds a new successor.
1899 /** Adds successor r to the list of successors; may forward tasks. */
1900 /* override */ bool register_successor( successor_type &r ) {
1901 buffer_operation op_data(reg_succ);
1903 my_aggregator.execute(&op_data);
1904 (void)enqueue_forwarding_task(op_data);
1908 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1909 /*override*/ void internal_add_built_successor( successor_type &r) {
1910 buffer_operation op_data(add_blt_succ);
1912 my_aggregator.execute(&op_data);
1915 /*override*/ void internal_delete_built_successor( successor_type &r) {
1916 buffer_operation op_data(del_blt_succ);
1918 my_aggregator.execute(&op_data);
1921 /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1922 buffer_operation op_data(add_blt_pred);
1924 my_aggregator.execute(&op_data);
1927 /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1928 buffer_operation op_data(del_blt_pred);
1930 my_aggregator.execute(&op_data);
1933 /*override*/ size_t predecessor_count() {
1934 buffer_operation op_data(blt_pred_cnt);
1935 my_aggregator.execute(&op_data);
1936 return op_data.cnt_val;
1939 /*override*/ size_t successor_count() {
1940 buffer_operation op_data(blt_succ_cnt);
1941 my_aggregator.execute(&op_data);
1942 return op_data.cnt_val;
1945 /*override*/ void copy_predecessors( predecessor_vector_type &v ) {
1946 buffer_operation op_data(blt_pred_cpy);
1948 my_aggregator.execute(&op_data);
1951 /*override*/ void copy_successors( successor_vector_type &v ) {
1952 buffer_operation op_data(blt_succ_cpy);
1954 my_aggregator.execute(&op_data);
1958 //! Removes a successor.
1959 /** Removes successor r from the list of successors.
1960 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1961 /* override */ bool remove_successor( successor_type &r ) {
1962 r.remove_predecessor(*this);
1963 buffer_operation op_data(rem_succ);
1965 my_aggregator.execute(&op_data);
1966 // even though this operation does not cause a forward, if we are the handler, and
1967 // a forward is scheduled, we may be the first to reach this point after the aggregator,
1968 // and so should check for the task.
1969 (void)enqueue_forwarding_task(op_data);
1973 //! Request an item from the buffer_node
1974 /** true = v contains the returned item<BR>
1975 false = no item has been returned */
1976 /* override */ bool try_get( T &v ) {
1977 buffer_operation op_data(req_item);
1979 my_aggregator.execute(&op_data);
1980 (void)enqueue_forwarding_task(op_data);
1981 return (op_data.status==SUCCEEDED);
1984 //! Reserves an item.
1985 /** false = no item can be reserved<BR>
1986 true = an item is reserved */
1987 /* override */ bool try_reserve( T &v ) {
1988 buffer_operation op_data(res_item);
1990 my_aggregator.execute(&op_data);
1991 (void)enqueue_forwarding_task(op_data);
1992 return (op_data.status==SUCCEEDED);
1995 //! Release a reserved item.
1996 /** true = item has been released and so remains in sender */
1997 /* override */ bool try_release() {
1998 buffer_operation op_data(rel_res);
1999 my_aggregator.execute(&op_data);
2000 (void)enqueue_forwarding_task(op_data);
2004 //! Consumes a reserved item.
2005 /** true = item is removed from sender and reservation removed */
2006 /* override */ bool try_consume() {
2007 buffer_operation op_data(con_res);
2008 my_aggregator.execute(&op_data);
2009 (void)enqueue_forwarding_task(op_data);
2015 template< typename R, typename B > friend class run_and_put_task;
2016 template<typename X, typename Y> friend class internal::broadcast_cache;
2017 template<typename X, typename Y> friend class internal::round_robin_cache;
2018 //! receive an item, return a task *if possible
2019 /* override */ task *try_put_task(const T &t) {
2020 buffer_operation op_data(t, put_item);
2021 my_aggregator.execute(&op_data);
2022 task *ft = grab_forwarding_task(op_data);
2023 // sequencer_nodes can return failure (if an item has been previously inserted)
2024 // We have to spawn the returned task if our own operation fails.
2026 if(ft && op_data.status == FAILED) {
2027 // we haven't succeeded queueing the item, but for some reason the
2028 // call returned a task (if another request resulted in a successful
2029 // forward this could happen.) Queue the task and reset the pointer.
2030 FLOW_SPAWN(*ft); ft = NULL;
2032 else if(!ft && op_data.status == SUCCEEDED) {
2033 ft = SUCCESSFULLY_ENQUEUED;
2038 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2039 internal::reservable_item_buffer<T, A>::reset();
2040 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2041 my_successors.reset(f);
2043 my_built_predecessors.receiver_extract(*this);
2046 forwarder_busy = false;
2049 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) { }
2053 //! Forwards messages in FIFO order
2054 template <typename T, typename A=cache_aligned_allocator<T> >
2055 class queue_node : public buffer_node<T, A> {
2057 typedef buffer_node<T, A> base_type;
2058 typedef typename base_type::size_type size_type;
2059 typedef typename base_type::buffer_operation queue_operation;
2061 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2063 /* override */ void internal_forward_task(queue_operation *op) {
2064 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2065 __TBB_store_with_release(op->status, FAILED);
2066 this->forwarder_busy = false;
2070 task *last_task = NULL;
2071 size_type counter = this->my_successors.size();
2072 // Keep trying to send items while there is at least one accepting successor
2073 while (counter>0 && this->my_item_valid(this->my_head)) {
2074 this->copy_front(i_copy);
2075 task *new_task = this->my_successors.try_put_task(i_copy);
2077 this->destroy_front();
2078 last_task = combine_tasks(last_task, new_task);
2082 op->ltask = last_task;
2083 if (last_task && !counter)
2084 __TBB_store_with_release(op->status, SUCCEEDED);
2086 __TBB_store_with_release(op->status, FAILED);
2087 this->forwarder_busy = false;
2091 /* override */ void internal_pop(queue_operation *op) {
2092 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2093 __TBB_store_with_release(op->status, FAILED);
2096 this->pop_front(*(op->elem));
2097 __TBB_store_with_release(op->status, SUCCEEDED);
2100 /* override */ void internal_reserve(queue_operation *op) {
2101 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2102 __TBB_store_with_release(op->status, FAILED);
2105 this->reserve_front(*(op->elem));
2106 __TBB_store_with_release(op->status, SUCCEEDED);
2109 /* override */ void internal_consume(queue_operation *op) {
2110 this->consume_front();
2111 __TBB_store_with_release(op->status, SUCCEEDED);
2115 typedef T input_type;
2116 typedef T output_type;
2117 typedef sender< input_type > predecessor_type;
2118 typedef receiver< output_type > successor_type;
2121 queue_node( graph &g ) : base_type(g) {
2122 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2123 static_cast<receiver<input_type> *>(this),
2124 static_cast<sender<output_type> *>(this) );
2127 //! Copy constructor
2128 queue_node( const queue_node& src) : base_type(src) {
2129 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2130 static_cast<receiver<input_type> *>(this),
2131 static_cast<sender<output_type> *>(this) );
2134 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2135 /* override */ void set_name( const char *name ) {
2136 tbb::internal::fgt_node_desc( this, name );
2140 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2141 base_type::reset(__TBB_PFG_RESET_ARG(f));
2145 //! Forwards messages in sequence order
2146 template< typename T, typename A=cache_aligned_allocator<T> >
2147 class sequencer_node : public queue_node<T, A> {
2148 internal::function_body< T, size_t > *my_sequencer;
2149 // my_sequencer should be a benign function and must be callable
2150 // from a parallel context. Does this mean it needn't be reset?
2152 typedef T input_type;
2153 typedef T output_type;
2154 typedef sender< input_type > predecessor_type;
2155 typedef receiver< output_type > successor_type;
2158 template< typename Sequencer >
2159 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2160 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2161 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2162 static_cast<receiver<input_type> *>(this),
2163 static_cast<sender<output_type> *>(this) );
2166 //! Copy constructor
2167 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2168 my_sequencer( src.my_sequencer->clone() ) {
2169 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2170 static_cast<receiver<input_type> *>(this),
2171 static_cast<sender<output_type> *>(this) );
2175 ~sequencer_node() { delete my_sequencer; }
2177 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2178 /* override */ void set_name( const char *name ) {
2179 tbb::internal::fgt_node_desc( this, name );
2184 typedef typename buffer_node<T, A>::size_type size_type;
2185 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
2187 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2190 /* override */ void internal_push(sequencer_operation *op) {
2191 size_type tag = (*my_sequencer)(*(op->elem));
2192 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2193 if(tag < this->my_head) {
2194 // have already emitted a message with this tag
2195 __TBB_store_with_release(op->status, FAILED);
2199 // cannot modify this->my_tail now; the buffer would be inconsistent.
2200 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2202 if(this->size(new_tail) > this->capacity()) {
2203 this->grow_my_array(this->size(new_tail));
2205 this->my_tail = new_tail;
2206 if(this->place_item(tag,*(op->elem))) {
2207 __TBB_store_with_release(op->status, SUCCEEDED);
2210 // already have a message with this tag
2211 __TBB_store_with_release(op->status, FAILED);
2214 }; // sequencer_node
2216 //! Forwards messages in priority order
2217 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2218 class priority_queue_node : public buffer_node<T, A> {
2220 typedef T input_type;
2221 typedef T output_type;
2222 typedef buffer_node<T,A> base_type;
2223 typedef sender< input_type > predecessor_type;
2224 typedef receiver< output_type > successor_type;
2227 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2228 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2229 static_cast<receiver<input_type> *>(this),
2230 static_cast<sender<output_type> *>(this) );
2233 //! Copy constructor
2234 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2235 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2236 static_cast<receiver<input_type> *>(this),
2237 static_cast<sender<output_type> *>(this) );
2240 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2241 /* override */ void set_name( const char *name ) {
2242 tbb::internal::fgt_node_desc( this, name );
2249 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2251 base_type::reset(__TBB_PFG_RESET_ARG(f));
2254 typedef typename buffer_node<T, A>::size_type size_type;
2255 typedef typename buffer_node<T, A>::item_type item_type;
2256 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
2258 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2260 /* override */ void handle_operations(prio_operation *op_list) {
2261 prio_operation *tmp = op_list /*, *pop_list*/ ;
2262 bool try_forwarding=false;
2265 op_list = op_list->next;
2266 switch (tmp->type) {
2267 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
2268 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
2269 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
2270 case buffer_node<T, A>::try_fwd_task: internal_forward_task(tmp); break;
2271 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
2272 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
2273 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
2274 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
2275 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2276 case buffer_node<T, A>::add_blt_succ: this->internal_add_built_succ(tmp); break;
2277 case buffer_node<T, A>::del_blt_succ: this->internal_del_built_succ(tmp); break;
2278 case buffer_node<T, A>::add_blt_pred: this->internal_add_built_pred(tmp); break;
2279 case buffer_node<T, A>::del_blt_pred: this->internal_del_built_pred(tmp); break;
2280 case buffer_node<T, A>::blt_succ_cnt: this->internal_succ_cnt(tmp); break;
2281 case buffer_node<T, A>::blt_pred_cnt: this->internal_pred_cnt(tmp); break;
2282 case buffer_node<T, A>::blt_succ_cpy: this->internal_copy_succs(tmp); break;
2283 case buffer_node<T, A>::blt_pred_cpy: this->internal_copy_preds(tmp); break;
2287 // process pops! for now, no special pop processing
2288 // concurrent_priority_queue handles pushes first, then pops.
2289 // that is the genesis of this comment
2290 if (mark<this->my_tail) heapify();
2291 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2292 if (try_forwarding && !this->forwarder_busy) { // could we also test for this->my_tail (queue non-empty)?
2293 task* tp = this->my_graph.root_task();
2295 this->forwarder_busy = true;
2296 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
2298 < buffer_node<input_type, A> >(*this);
2299 // tmp should point to the last item handled by the aggregator. This is the operation
2300 // the handling thread enqueued. So modifying that record will be okay.
2301 tbb::task *tmp1 = tmp->ltask;
2302 tmp->ltask = combine_tasks(tmp1, new_task);
2307 //! Tries to forward valid items to successors
2308 /* override */ void internal_forward_task(prio_operation *op) {
2310 task * last_task = NULL; // flagged when a successor accepts
2311 size_type counter = this->my_successors.size();
2313 if (this->my_reserved || this->my_tail == 0) {
2314 __TBB_store_with_release(op->status, FAILED);
2315 this->forwarder_busy = false;
2318 // Keep trying to send while there exists an accepting successor
2319 while (counter>0 && this->my_tail > 0) {
2321 task * new_task = this->my_successors.try_put_task(i_copy);
2323 last_task = combine_tasks(last_task, new_task);
2328 op->ltask = last_task;
2329 if (last_task && !counter)
2330 __TBB_store_with_release(op->status, SUCCEEDED);
2332 __TBB_store_with_release(op->status, FAILED);
2333 this->forwarder_busy = false;
2337 /* override */ void internal_push(prio_operation *op) {
2338 prio_push(*(op->elem));
2339 __TBB_store_with_release(op->status, SUCCEEDED);
2342 /* override */ void internal_pop(prio_operation *op) {
2343 // if empty or already reserved, don't pop
2344 if ( this->my_reserved == true || this->my_tail == 0 ) {
2345 __TBB_store_with_release(op->status, FAILED);
2349 prio_copy(*(op->elem));
2350 __TBB_store_with_release(op->status, SUCCEEDED);
2355 // pops the highest-priority item, saves copy
2356 /* override */ void internal_reserve(prio_operation *op) {
2357 if (this->my_reserved == true || this->my_tail == 0) {
2358 __TBB_store_with_release(op->status, FAILED);
2361 this->my_reserved = true;
2362 prio_copy(*(op->elem));
2363 reserved_item = *(op->elem);
2364 __TBB_store_with_release(op->status, SUCCEEDED);
2368 /* override */ void internal_consume(prio_operation *op) {
2369 __TBB_store_with_release(op->status, SUCCEEDED);
2370 this->my_reserved = false;
2371 reserved_item = input_type();
2374 /* override */ void internal_release(prio_operation *op) {
2375 __TBB_store_with_release(op->status, SUCCEEDED);
2376 prio_push(reserved_item);
2377 this->my_reserved = false;
2378 reserved_item = input_type();
2384 input_type reserved_item;
2386 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2387 bool prio_use_tail() {
2388 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2389 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2392 // prio_push: checks that the item will fit, expand array if necessary, put at end
2393 void prio_push(const T &src) {
2394 if ( this->my_tail >= this->my_array_size )
2395 this->grow_my_array( this->my_tail + 1 );
2396 (void) this->place_item(this->my_tail, src);
2398 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2401 // prio_pop: deletes highest priority item from the array, and if it is item
2402 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2403 // and mark. Assumes the array has already been tested for emptiness; no failure.
2405 if (prio_use_tail()) {
2406 // there are newly pushed elems; last one higher than top
2408 this->destroy_item(this->my_tail-1);
2410 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2413 this->destroy_item(0);
2414 if(this->my_tail > 1) {
2415 // push the last element down heap
2416 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2417 this->move_item(0,this->my_tail - 1);
2420 if(mark > this->my_tail) --mark;
2421 if (this->my_tail > 1) // don't reheap for heap of size 1
2423 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2426 void prio_copy(T &res) {
2427 if (prio_use_tail()) {
2428 res = this->get_my_item(this->my_tail - 1);
2431 res = this->get_my_item(0);
2435 // turn array into heap
2437 if(this->my_tail == 0) {
2441 if (!mark) mark = 1;
2442 for (; mark<this->my_tail; ++mark) { // for each unheaped element
2443 size_type cur_pos = mark;
2444 input_type to_place;
2445 this->fetch_item(mark,to_place);
2446 do { // push to_place up the heap
2447 size_type parent = (cur_pos-1)>>1;
2448 if (!compare(this->get_my_item(parent), to_place))
2450 this->move_item(cur_pos, parent);
2453 (void) this->place_item(cur_pos, to_place);
2457 // otherwise heapified array with new root element; rearrange to heap
2459 size_type cur_pos=0, child=1;
2460 while (child < mark) {
2461 size_type target = child;
2463 compare(this->get_my_item(child),
2464 this->get_my_item(child+1)))
2466 // target now has the higher priority child
2467 if (compare(this->get_my_item(target),
2468 this->get_my_item(cur_pos)))
2471 this->swap_items(cur_pos, target);
2473 child = (cur_pos<<1)+1;
2476 }; // priority_queue_node
2478 //! Forwards messages only if the threshold has not been reached
2479 /** This node forwards items until its threshold is reached.
2480 It contains no buffering. If the downstream node rejects, the
2481 message is dropped. */
2482 template< typename T >
2483 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2485 using graph_node::my_graph;
2487 typedef T input_type;
2488 typedef T output_type;
2489 typedef sender< input_type > predecessor_type;
2490 typedef receiver< output_type > successor_type;
2491 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2492 typedef std::vector<successor_type *> successor_vector_type;
2493 typedef std::vector<predecessor_type *> predecessor_vector_type;
2497 size_t my_threshold;
2498 size_t my_count; //number of successful puts
2499 size_t my_tries; //number of active put attempts
2500 internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
2501 spin_mutex my_mutex;
2502 internal::broadcast_cache< T > my_successors;
2503 int init_decrement_predecessors;
2505 friend class internal::forward_task_bypass< limiter_node<T> >;
2507 // Let decrementer call decrement_counter()
2508 friend class internal::decrementer< limiter_node<T> >;
2510 bool check_conditions() { // always called under lock
2511 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2514 // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2515 task *forward_task() {
2518 bool reserved = false;
2520 spin_mutex::scoped_lock lock(my_mutex);
2521 if ( check_conditions() )
2528 // if we can reserve and can put, we consume the reservation
2529 // we increment the count and decrement the tries
2530 if ( (my_predecessors.try_reserve(v)) == true ){
2532 if ( (rval = my_successors.try_put_task(v)) != NULL ){
2534 spin_mutex::scoped_lock lock(my_mutex);
2537 my_predecessors.try_consume();
2538 if ( check_conditions() ) {
2539 task* tp = this->my_graph.root_task();
2541 task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2542 internal::forward_task_bypass< limiter_node<T> >( *this );
2543 FLOW_SPAWN (*rtask);
2551 //if we can't reserve, we decrement the tries
2552 //if we can reserve but can't put, we decrement the tries and release the reservation
2554 spin_mutex::scoped_lock lock(my_mutex);
2556 if (reserved) my_predecessors.try_release();
2557 if ( check_conditions() ) {
2558 task* tp = this->my_graph.root_task();
2560 task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2561 internal::forward_task_bypass< limiter_node<T> >( *this );
2562 __TBB_ASSERT(!rval, "Have two tasks to handle");
2571 __TBB_ASSERT(false, "Should never be called");
2575 task * decrement_counter() {
2577 spin_mutex::scoped_lock lock(my_mutex);
2578 if(my_count) --my_count;
2580 return forward_task();
2584 //! The internal receiver< continue_msg > that decrements the count
2585 internal::decrementer< limiter_node<T> > decrement;
2588 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
2589 graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2590 init_decrement_predecessors(num_decrement_predecessors),
2591 decrement(num_decrement_predecessors)
2593 my_predecessors.set_owner(this);
2594 my_successors.set_owner(this);
2595 decrement.set_owner(this);
2596 tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2597 static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2598 static_cast<sender<output_type> *>(this) );
2601 //! Copy constructor
2602 limiter_node( const limiter_node& src ) :
2603 graph_node(src.my_graph), receiver<T>(), sender<T>(),
2604 my_threshold(src.my_threshold), my_count(0), my_tries(0),
2605 init_decrement_predecessors(src.init_decrement_predecessors),
2606 decrement(src.init_decrement_predecessors)
2608 my_predecessors.set_owner(this);
2609 my_successors.set_owner(this);
2610 decrement.set_owner(this);
2611 tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2612 static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2613 static_cast<sender<output_type> *>(this) );
2616 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2617 /* override */ void set_name( const char *name ) {
2618 tbb::internal::fgt_node_desc( this, name );
2622 //! Replace the current successor with this new successor
2623 /* override */ bool register_successor( receiver<output_type> &r ) {
2624 spin_mutex::scoped_lock lock(my_mutex);
2625 bool was_empty = my_successors.empty();
2626 my_successors.register_successor(r);
2627 //spawn a forward task if this is the only successor
2628 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2629 task* tp = this->my_graph.root_task();
2631 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2632 internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2638 //! Removes a successor from this node
2639 /** r.remove_predecessor(*this) is also called. */
2640 /* override */ bool remove_successor( receiver<output_type> &r ) {
2641 r.remove_predecessor(*this);
2642 my_successors.remove_successor(r);
2646 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2647 /*override*/void internal_add_built_successor(receiver<output_type> &src) {
2648 my_successors.internal_add_built_successor(src);
2651 /*override*/void internal_delete_built_successor(receiver<output_type> &src) {
2652 my_successors.internal_delete_built_successor(src);
2655 /*override*/size_t successor_count() { return my_successors.successor_count(); }
2657 /*override*/ void copy_successors(successor_vector_type &v) {
2658 my_successors.copy_successors(v);
2661 /*override*/void internal_add_built_predecessor(sender<output_type> &src) {
2662 my_predecessors.internal_add_built_predecessor(src);
2665 /*override*/void internal_delete_built_predecessor(sender<output_type> &src) {
2666 my_predecessors.internal_delete_built_predecessor(src);
2669 /*override*/size_t predecessor_count() { return my_predecessors.predecessor_count(); }
2671 /*override*/ void copy_predecessors(predecessor_vector_type &v) {
2672 my_predecessors.copy_predecessors(v);
2674 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
2676 //! Adds src to the list of cached predecessors.
2677 /* override */ bool register_predecessor( predecessor_type &src ) {
2678 spin_mutex::scoped_lock lock(my_mutex);
2679 my_predecessors.add( src );
2680 task* tp = this->my_graph.root_task();
2681 if ( my_count + my_tries < my_threshold && !my_successors.empty() && tp ) {
2682 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2683 internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2688 //! Removes src from the list of cached predecessors.
2689 /* override */ bool remove_predecessor( predecessor_type &src ) {
2690 my_predecessors.remove( src );
2696 template< typename R, typename B > friend class run_and_put_task;
2697 template<typename X, typename Y> friend class internal::broadcast_cache;
2698 template<typename X, typename Y> friend class internal::round_robin_cache;
2699 //! Puts an item to this receiver
2700 /* override */ task *try_put_task( const T &t ) {
2702 spin_mutex::scoped_lock lock(my_mutex);
2703 if ( my_count + my_tries >= my_threshold )
2709 task * rtask = my_successors.try_put_task(t);
2711 if ( !rtask ) { // try_put_task failed.
2712 spin_mutex::scoped_lock lock(my_mutex);
2714 task* tp = this->my_graph.root_task();
2715 if ( check_conditions() && tp ) {
2716 rtask = new ( task::allocate_additional_child_of( *tp ) )
2717 internal::forward_task_bypass< limiter_node<T> >( *this );
2721 spin_mutex::scoped_lock lock(my_mutex);
2728 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2730 my_predecessors.reset(__TBB_PFG_RESET_ARG(f));
2731 decrement.reset_receiver(__TBB_PFG_RESET_ARG(f));
2732 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2733 my_successors.reset(f);
2737 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); }
2740 #include "internal/_flow_graph_join_impl.h"
2742 using internal::reserving_port;
2743 using internal::queueing_port;
2744 using internal::tag_matching_port;
2745 using internal::input_port;
2746 using internal::tag_value;
2747 using internal::NO_TAG;
2749 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
2751 template<typename OutputTuple>
2752 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2754 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2755 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2757 typedef OutputTuple output_type;
2758 typedef typename unfolded_type::input_ports_type input_ports_type;
2759 join_node(graph &g) : unfolded_type(g) {
2760 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2761 this->input_ports(), static_cast< sender< output_type > *>(this) );
2763 join_node(const join_node &other) : unfolded_type(other) {
2764 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2765 this->input_ports(), static_cast< sender< output_type > *>(this) );
2768 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2769 /* override */ void set_name( const char *name ) {
2770 tbb::internal::fgt_node_desc( this, name );
2776 template<typename OutputTuple>
2777 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2779 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2780 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2782 typedef OutputTuple output_type;
2783 typedef typename unfolded_type::input_ports_type input_ports_type;
2784 join_node(graph &g) : unfolded_type(g) {
2785 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2786 this->input_ports(), static_cast< sender< output_type > *>(this) );
2788 join_node(const join_node &other) : unfolded_type(other) {
2789 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2790 this->input_ports(), static_cast< sender< output_type > *>(this) );
2793 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2794 /* override */ void set_name( const char *name ) {
2795 tbb::internal::fgt_node_desc( this, name );
2801 // template for tag_matching join_node
2802 template<typename OutputTuple>
2803 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2804 tag_matching_port, OutputTuple, tag_matching> {
2806 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2807 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
2809 typedef OutputTuple output_type;
2810 typedef typename unfolded_type::input_ports_type input_ports_type;
2812 template<typename __TBB_B0, typename __TBB_B1>
2813 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2814 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2815 this->input_ports(), static_cast< sender< output_type > *>(this) );
2817 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2818 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2819 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2820 this->input_ports(), static_cast< sender< output_type > *>(this) );
2822 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2823 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2824 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2825 this->input_ports(), static_cast< sender< output_type > *>(this) );
2827 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2828 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2829 unfolded_type(g, b0, b1, b2, b3, b4) {
2830 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2831 this->input_ports(), static_cast< sender< output_type > *>(this) );
2833 #if __TBB_VARIADIC_MAX >= 6
2834 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2836 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2837 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2838 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2839 this->input_ports(), static_cast< sender< output_type > *>(this) );
2842 #if __TBB_VARIADIC_MAX >= 7
2843 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2844 typename __TBB_B5, typename __TBB_B6>
2845 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2846 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2847 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2848 this->input_ports(), static_cast< sender< output_type > *>(this) );
2851 #if __TBB_VARIADIC_MAX >= 8
2852 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2853 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2854 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2855 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2856 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2857 this->input_ports(), static_cast< sender< output_type > *>(this) );
2860 #if __TBB_VARIADIC_MAX >= 9
2861 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2862 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2863 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2864 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2865 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2866 this->input_ports(), static_cast< sender< output_type > *>(this) );
2869 #if __TBB_VARIADIC_MAX >= 10
2870 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2871 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2872 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2873 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2874 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2875 this->input_ports(), static_cast< sender< output_type > *>(this) );
2878 join_node(const join_node &other) : unfolded_type(other) {
2879 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2880 this->input_ports(), static_cast< sender< output_type > *>(this) );
2883 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2884 /* override */ void set_name( const char *name ) {
2885 tbb::internal::fgt_node_desc( this, name );
2892 #include "internal/_flow_graph_indexer_impl.h"
2894 struct indexer_null_type {};
2896 template<typename T0, typename T1=indexer_null_type, typename T2=indexer_null_type, typename T3=indexer_null_type,
2897 typename T4=indexer_null_type, typename T5=indexer_null_type, typename T6=indexer_null_type,
2898 typename T7=indexer_null_type, typename T8=indexer_null_type, typename T9=indexer_null_type> class indexer_node;
2900 //indexer node specializations
2901 template<typename T0>
2902 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2904 static const int N = 1;
2906 typedef tuple<T0> InputTuple;
2907 typedef typename internal::tagged_msg<size_t, T0> output_type;
2908 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2909 indexer_node(graph& g) : unfolded_type(g) {
2910 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2911 this->input_ports(), static_cast< sender< output_type > *>(this) );
2914 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2915 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2916 this->input_ports(), static_cast< sender< output_type > *>(this) );
2919 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2920 void set_name( const char *name ) {
2921 tbb::internal::fgt_node_desc( this, name );
2926 template<typename T0, typename T1>
2927 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2929 static const int N = 2;
2931 typedef tuple<T0, T1> InputTuple;
2932 typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
2933 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2934 indexer_node(graph& g) : unfolded_type(g) {
2935 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2936 this->input_ports(), static_cast< sender< output_type > *>(this) );
2939 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2940 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2941 this->input_ports(), static_cast< sender< output_type > *>(this) );
2944 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2945 void set_name( const char *name ) {
2946 tbb::internal::fgt_node_desc( this, name );
2951 template<typename T0, typename T1, typename T2>
2952 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2954 static const int N = 3;
2956 typedef tuple<T0, T1, T2> InputTuple;
2957 typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
2958 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2959 indexer_node(graph& g) : unfolded_type(g) {
2960 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2961 this->input_ports(), static_cast< sender< output_type > *>(this) );
2964 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2965 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2966 this->input_ports(), static_cast< sender< output_type > *>(this) );
2969 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2970 void set_name( const char *name ) {
2971 tbb::internal::fgt_node_desc( this, name );
2976 template<typename T0, typename T1, typename T2, typename T3>
2977 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2979 static const int N = 4;
2981 typedef tuple<T0, T1, T2, T3> InputTuple;
2982 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
2983 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2984 indexer_node(graph& g) : unfolded_type(g) {
2985 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2986 this->input_ports(), static_cast< sender< output_type > *>(this) );
2989 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2990 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2991 this->input_ports(), static_cast< sender< output_type > *>(this) );
2994 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2995 /* override */ void set_name( const char *name ) {
2996 tbb::internal::fgt_node_desc( this, name );
3001 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3002 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3004 static const int N = 5;
3006 typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3007 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
3008 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3009 indexer_node(graph& g) : unfolded_type(g) {
3010 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3011 this->input_ports(), static_cast< sender< output_type > *>(this) );
3014 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3015 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3016 this->input_ports(), static_cast< sender< output_type > *>(this) );
3019 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3020 /* override */ void set_name( const char *name ) {
3021 tbb::internal::fgt_node_desc( this, name );
3026 #if __TBB_VARIADIC_MAX >= 6
3027 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3028 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3030 static const int N = 6;
3032 typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3033 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3034 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3035 indexer_node(graph& g) : unfolded_type(g) {
3036 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3037 this->input_ports(), static_cast< sender< output_type > *>(this) );
3040 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3041 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3042 this->input_ports(), static_cast< sender< output_type > *>(this) );
3045 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3046 /* override */ void set_name( const char *name ) {
3047 tbb::internal::fgt_node_desc( this, name );
3051 #endif //variadic max 6
3053 #if __TBB_VARIADIC_MAX >= 7
3054 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3056 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3058 static const int N = 7;
3060 typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3061 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3062 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3063 indexer_node(graph& g) : unfolded_type(g) {
3064 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3065 this->input_ports(), static_cast< sender< output_type > *>(this) );
3068 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3069 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3070 this->input_ports(), static_cast< sender< output_type > *>(this) );
3073 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3074 /* override */ void set_name( const char *name ) {
3075 tbb::internal::fgt_node_desc( this, name );
3079 #endif //variadic max 7
3081 #if __TBB_VARIADIC_MAX >= 8
3082 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3083 typename T6, typename T7>
3084 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3086 static const int N = 8;
3088 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3089 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3090 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3091 indexer_node(graph& g) : unfolded_type(g) {
3092 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3093 this->input_ports(), static_cast< sender< output_type > *>(this) );
3096 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3097 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3098 this->input_ports(), static_cast< sender< output_type > *>(this) );
3101 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3102 /* override */ void set_name( const char *name ) {
3103 tbb::internal::fgt_node_desc( this, name );
3107 #endif //variadic max 8
3109 #if __TBB_VARIADIC_MAX >= 9
3110 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3111 typename T6, typename T7, typename T8>
3112 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3114 static const int N = 9;
3116 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3117 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3118 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3119 indexer_node(graph& g) : unfolded_type(g) {
3120 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3121 this->input_ports(), static_cast< sender< output_type > *>(this) );
3124 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3125 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3126 this->input_ports(), static_cast< sender< output_type > *>(this) );
3129 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3130 /* override */ void set_name( const char *name ) {
3131 tbb::internal::fgt_node_desc( this, name );
3135 #endif //variadic max 9
3137 #if __TBB_VARIADIC_MAX >= 10
3138 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3139 typename T6, typename T7, typename T8, typename T9>
3140 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3142 static const int N = 10;
3144 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3145 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3146 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3147 indexer_node(graph& g) : unfolded_type(g) {
3148 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3149 this->input_ports(), static_cast< sender< output_type > *>(this) );
3152 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3153 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3154 this->input_ports(), static_cast< sender< output_type > *>(this) );
3157 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3158 /* override */ void set_name( const char *name ) {
3159 tbb::internal::fgt_node_desc( this, name );
3163 #endif //variadic max 10
3165 //! Makes an edge between a single predecessor and a single successor
3166 template< typename T >
3167 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3168 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3169 s.internal_add_built_predecessor(p);
3170 p.internal_add_built_successor(s);
3172 p.register_successor( s );
3173 tbb::internal::fgt_make_edge( &p, &s );
3176 //! Makes an edge between a single predecessor and a single successor
3177 template< typename T >
3178 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3179 p.remove_successor( s );
3180 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3181 // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3182 p.internal_delete_built_successor(s);
3183 s.internal_delete_built_predecessor(p);
3185 tbb::internal::fgt_remove_edge( &p, &s );
3188 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3189 template<typename C >
3190 template< typename S >
3191 void edge_container<C>::sender_extract( S &s ) {
3192 edge_vector e = built_edges;
3193 for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3194 remove_edge(s, **i);
3198 template<typename C >
3199 template< typename R >
3200 void edge_container<C>::receiver_extract( R &r ) {
3201 edge_vector e = built_edges;
3202 for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3203 remove_edge(**i, r);
3208 //! Returns a copy of the body from a function or continue node
3209 template< typename Body, typename Node >
3210 Body copy_body( Node &n ) {
3211 return n.template copy_function_object<Body>();
3216 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3217 using interface7::reset_flags;
3218 using interface7::rf_reset_protocol;
3219 using interface7::rf_reset_bodies;
3220 using interface7::rf_extract;
3223 using interface7::graph;
3224 using interface7::graph_node;
3225 using interface7::continue_msg;
3226 using interface7::sender;
3227 using interface7::receiver;
3228 using interface7::continue_receiver;
3230 using interface7::source_node;
3231 using interface7::function_node;
3232 using interface7::multifunction_node;
3233 using interface7::split_node;
3234 using interface7::internal::output_port;
3235 using interface7::indexer_node;
3236 using interface7::internal::tagged_msg;
3237 using interface7::internal::cast_to;
3238 using interface7::internal::is_a;
3239 using interface7::continue_node;
3240 using interface7::overwrite_node;
3241 using interface7::write_once_node;
3242 using interface7::broadcast_node;
3243 using interface7::buffer_node;
3244 using interface7::queue_node;
3245 using interface7::sequencer_node;
3246 using interface7::priority_queue_node;
3247 using interface7::limiter_node;
3248 using namespace interface7::internal::graph_policy_namespace;
3249 using interface7::join_node;
3250 using interface7::input_port;
3251 using interface7::copy_body;
3252 using interface7::make_edge;
3253 using interface7::remove_edge;
3254 using interface7::internal::NO_TAG;
3255 using interface7::internal::tag_value;
3260 #undef __TBB_PFG_RESET_ARG
3263 #endif // __TBB_flow_graph_H