2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
32 #if !TBB_PREVIEW_GRAPH
33 #error Set TBB_PREVIEW_GRAPH to include graph.h
36 #include "tbb_stddef.h"
38 #include "spin_mutex.h"
39 #include "null_mutex.h"
40 #include "spin_rw_mutex.h"
41 #include "null_rw_mutex.h"
43 #include "concurrent_vector.h"
44 #include "_aggregator_internal.h"
46 // use the VC10 or gcc version of tuple if it is available.
47 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
48 #define TBB_PREVIEW_TUPLE 1
49 #include "compat/tuple"
58 \brief The graph related classes and functions
60 There are some applications that best express dependencies as messages
61 passed between nodes in a graph. These messages may contain data or
62 simply act as signals that a predecessors has completed. The graph
63 class and its associated node classes can be used to express such
69 //! The base of all graph nodes. Allows them to be stored in a collection for deletion.
72 virtual ~graph_node() {}
75 //! An empty class used for messages that mean "I'm done"
76 class continue_msg {};
78 template< typename T > class sender;
79 template< typename T > class receiver;
80 class continue_receiver;
82 //! Pure virtual template class that defines a sender of messages of type T
83 template< typename T >
86 //! The output type of this sender
87 typedef T output_type;
89 //! The successor type for this node
90 typedef receiver<T> successor_type;
94 //! Add a new successor to this node
95 virtual bool register_successor( successor_type &r ) = 0;
97 //! Removes a successor from this node
98 virtual bool remove_successor( successor_type &r ) = 0;
100 //! Request an item from the sender
101 virtual bool try_get( T & ) { return false; }
103 //! Reserves an item in the sender
104 virtual bool try_reserve( T & ) { return false; }
106 //! Releases the reserved item
107 virtual bool try_release( ) { return false; }
109 //! Consumes the reserved item
110 virtual bool try_consume( ) { return false; }
115 //! Pure virtual template class that defines a receiver of messages of type T
116 template< typename T >
120 //! The input type of this receiver
121 typedef T input_type;
123 //! The predecessor type for this node
124 typedef sender<T> predecessor_type;
127 virtual ~receiver() {}
129 //! Put an item to the receiver
130 virtual bool try_put( T t ) = 0;
132 //! Add a predecessor to the node
133 virtual bool register_predecessor( predecessor_type & ) { return false; }
135 //! Remove a predecessor from the node
136 virtual bool remove_predecessor( predecessor_type & ) { return false; }
140 //! Base class for receivers of completion messages
141 /** These receivers automatically reset, but cannot be explicitly waited on */
142 class continue_receiver : public receiver< continue_msg > {
146 typedef continue_msg input_type;
148 //! The predecessor type for this node
149 typedef sender< continue_msg > predecessor_type;
152 continue_receiver( int number_of_predecessors = 0 ) {
153 my_predecessor_count = number_of_predecessors;
154 my_current_count = 0;
158 virtual ~continue_receiver() { }
160 //! Increments the trigger threshold
161 /* override */ bool register_predecessor( predecessor_type & ) {
162 spin_mutex::scoped_lock l(my_mutex);
163 ++my_predecessor_count;
167 //! Decrements the trigger threshold
168 /** Does not check to see if the removal of the predecessor now makes the current count
169 exceed the new threshold. So removing a predecessor while the graph is active can cause
170 unexpected results. */
171 /* override */ bool remove_predecessor( predecessor_type & ) {
172 spin_mutex::scoped_lock l(my_mutex);
173 --my_predecessor_count;
177 //! Puts a continue_msg to the receiver
178 /** If the message causes the message count to reach the predecessor count, execute() is called and
179 the message count is reset to 0. Otherwise the message count is incremented. */
180 /* override */ bool try_put( input_type ) {
182 spin_mutex::scoped_lock l(my_mutex);
183 if ( ++my_current_count < my_predecessor_count )
186 my_current_count = 0;
195 int my_predecessor_count;
196 int my_current_count;
198 //! Does whatever should happen when the threshold is reached
199 /** This should be very fast or else spawn a task. This is
200 called while the sender is blocked in the try_put(). */
201 virtual void execute() = 0;
208 //! The state of an executable node
209 enum node_state { node_state_idle=0, node_state_nonidle=1, node_state_inactive=2 };
212 //! A functor that takes no input and generates a value of type Output
213 template< typename Output >
214 class source_body : no_assign {
216 virtual ~source_body() {}
217 virtual bool operator()(Output &output) = 0;
220 //! The leaf for source_body
221 template< typename Output, typename Body>
222 class source_body_leaf : public source_body<Output> {
224 source_body_leaf( Body _body ) : body(_body) { }
225 /*override */ bool operator()(Output &output) { return body( output ); }
230 //! A functor that takes an Input and generates an Output
231 template< typename Input, typename Output >
232 class function_body : no_assign {
234 virtual ~function_body() {}
235 virtual Output operator()(Input input) = 0;
238 //! the leaf for function_body
239 template <typename Input, typename Output, typename B>
240 class function_body_leaf : public function_body< Input, Output > {
242 function_body_leaf( B _body ) : body(_body) { }
243 Output operator()(Input i) { return body(i); }
249 //! the leaf for function_body specialized for Input and output of continue_msg
250 template <typename B>
251 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
253 function_body_leaf( B _body ) : body(_body) { }
254 continue_msg operator()( continue_msg i ) {
263 //! the leaf for function_body specialized for Output of continue_msg
264 template <typename Input, typename B>
265 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
267 function_body_leaf( B _body ) : body(_body) { }
268 continue_msg operator()(Input i) {
270 return continue_msg();
277 //! the leaf for function_body specialized for Input of continue_msg
278 template <typename Output, typename B>
279 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
281 function_body_leaf( B _body ) : body(_body) { }
282 Output operator()(continue_msg i) {
290 //! A task that calls a node's forward function
291 template< typename NodeType >
292 class forward_task : public task {
298 forward_task( NodeType &n ) : my_node(n) {}
306 //! A task that calls a node's apply_body function, passing in an input of type Input
307 template< typename NodeType, typename Input >
308 class apply_body_task : public task {
315 apply_body_task( NodeType &n, Input i ) : my_node(n), my_input(i) {}
318 my_node.apply_body( my_input );
323 //! A task that calls a node's apply_body function with no input
324 template< typename NodeType >
325 class source_task : public task {
331 source_task( NodeType &n ) : my_node(n) {}
334 my_node.apply_body( );
339 //! An empty functor that takes an Input and returns a default constructed Output
340 template< typename Input, typename Output >
342 Output operator()( const Input & ) const { return Output(); }
345 //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
346 template< typename T, typename M=spin_mutex >
350 typedef size_t size_type;
353 typename my_mutex_type::scoped_lock lock( my_mutex );
354 return internal_empty();
358 typename my_mutex_type::scoped_lock lock( my_mutex );
362 void remove( T &n ) {
363 typename my_mutex_type::scoped_lock lock( my_mutex );
364 for ( size_t i = internal_size(); i != 0; --i ) {
365 T &s = internal_pop();
374 typedef M my_mutex_type;
375 my_mutex_type my_mutex;
376 std::queue< T * > my_q;
378 // Assumes lock is held
379 inline bool internal_empty( ) {
383 // Assumes lock is held
384 inline size_type internal_size( ) {
388 // Assumes lock is held
389 inline void internal_push( T &n ) {
393 // Assumes lock is held
394 inline T &internal_pop() {
402 //! A cache of predecessors that only supports try_get
403 template< typename T, typename M=spin_mutex >
404 class predecessor_cache : public node_cache< sender<T>, M > {
406 typedef M my_mutex_type;
407 typedef T output_type;
408 typedef sender<output_type> predecessor_type;
409 typedef receiver<output_type> successor_type;
411 predecessor_cache( ) : my_owner( NULL ) { }
413 void set_owner( successor_type *owner ) { my_owner = owner; }
415 bool get_item( output_type &v ) {
420 predecessor_type *src;
422 typename my_mutex_type::scoped_lock lock(this->my_mutex);
423 if ( this->internal_empty() ) {
426 src = &this->internal_pop();
429 // Try to get from this sender
430 msg = src->try_get( v );
433 // Relinquish ownership of the edge
435 src->register_successor( *my_owner );
437 // Retain ownership of the edge
440 } while ( msg == false );
445 successor_type *my_owner;
448 //! An cache of predecessors that supports requests and reservations
449 template< typename T, typename M=spin_mutex >
450 class reservable_predecessor_cache : public predecessor_cache< T, M > {
452 typedef M my_mutex_type;
453 typedef T output_type;
454 typedef sender<T> predecessor_type;
455 typedef receiver<T> successor_type;
457 reservable_predecessor_cache( ) : reserved_src(NULL) { }
460 try_reserve( output_type &v ) {
465 typename my_mutex_type::scoped_lock lock(this->my_mutex);
466 if ( reserved_src || this->internal_empty() )
469 reserved_src = &this->internal_pop();
472 // Try to get from this sender
473 msg = reserved_src->try_reserve( v );
476 typename my_mutex_type::scoped_lock lock(this->my_mutex);
477 // Relinquish ownership of the edge
478 reserved_src->register_successor( *this->my_owner );
481 // Retain ownership of the edge
482 this->add( *reserved_src );
484 } while ( msg == false );
491 reserved_src->try_release( );
498 reserved_src->try_consume( );
504 predecessor_type *reserved_src;
508 //! An abstract cache of succesors
509 template<typename T, typename M=spin_rw_mutex >
510 class successor_cache : no_copy {
513 typedef M my_mutex_type;
514 my_mutex_type my_mutex;
516 typedef std::list< receiver<T> * > my_successors_type;
517 my_successors_type my_successors;
523 successor_cache( ) : my_owner(NULL) {}
525 void set_owner( sender<T> *owner ) { my_owner = owner; }
527 virtual ~successor_cache() {}
529 void register_successor( receiver<T> &r ) {
530 typename my_mutex_type::scoped_lock l(my_mutex, true);
531 my_successors.push_back( &r );
534 void remove_successor( receiver<T> &r ) {
535 typename my_mutex_type::scoped_lock l(my_mutex, true);
536 for ( typename my_successors_type::iterator i = my_successors.begin();
537 i != my_successors.end(); ++i ) {
539 my_successors.erase(i);
546 typename my_mutex_type::scoped_lock l(my_mutex, false);
547 return my_successors.empty();
550 virtual bool try_put( T t ) = 0;
553 //! An abstract cache of succesors, specialized to continue_msg
555 class successor_cache< continue_msg > : no_copy {
558 typedef spin_rw_mutex my_mutex_type;
559 my_mutex_type my_mutex;
561 typedef std::list< receiver<continue_msg> * > my_successors_type;
562 my_successors_type my_successors;
564 sender<continue_msg> *my_owner;
568 successor_cache( ) : my_owner(NULL) {}
570 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
572 virtual ~successor_cache() {}
574 void register_successor( receiver<continue_msg> &r ) {
575 my_mutex_type::scoped_lock l(my_mutex, true);
576 my_successors.push_back( &r );
578 r.register_predecessor( *my_owner );
581 void remove_successor( receiver<continue_msg> &r ) {
582 my_mutex_type::scoped_lock l(my_mutex, true);
583 for ( my_successors_type::iterator i = my_successors.begin();
584 i != my_successors.end(); ++i ) {
587 r.remove_predecessor( *my_owner );
588 my_successors.erase(i);
595 my_mutex_type::scoped_lock l(my_mutex, false);
596 return my_successors.empty();
599 virtual bool try_put( continue_msg t ) = 0;
603 //! A cache of successors that are broadcast to
604 template<typename T, typename M=spin_rw_mutex>
605 class broadcast_cache : public successor_cache<T, M> {
606 typedef M my_mutex_type;
607 typedef std::list< receiver<T> * > my_successors_type;
611 broadcast_cache( ) {}
613 bool try_put( T t ) {
615 bool upgraded = false;
616 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
617 typename my_successors_type::iterator i = this->my_successors.begin();
618 while ( i != this->my_successors.end() ) {
619 if ( (*i)->try_put( t ) == true ) {
623 if ( (*i)->register_predecessor(*this->my_owner) ) {
625 l.upgrade_to_writer();
628 i = this->my_successors.erase(i);
639 //! A cache of successors that are put in a round-robin fashion
640 template<typename T, typename M=spin_rw_mutex >
641 class round_robin_cache : public successor_cache<T, M> {
642 typedef size_t size_type;
643 typedef M my_mutex_type;
644 typedef std::list< receiver<T> * > my_successors_type;
648 round_robin_cache( ) {}
651 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
652 return this->my_successors.size();
655 bool try_put( T t ) {
656 bool upgraded = false;
657 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
658 typename my_successors_type::iterator i = this->my_successors.begin();
659 while ( i != this->my_successors.end() ) {
660 if ( (*i)->try_put( t ) ) {
663 if ( (*i)->register_predecessor(*this->my_owner) ) {
665 l.upgrade_to_writer();
668 i = this->my_successors.erase(i);
680 class decrementer : public continue_receiver, internal::no_copy {
685 my_node->decrement_counter();
690 typedef continue_msg input_type;
691 typedef continue_msg output_type;
692 decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
693 void set_owner( T *node ) { my_node = node; }
697 //! @endcond INTERNAL
701 /** This class serves as a handle to the graph */
702 class graph : internal::no_copy {
704 template< typename Body >
705 class run_task : public task {
707 run_task( Body& body ) : my_body(body) {}
716 template< typename Receiver, typename Body >
717 class run_and_put_task : public task {
719 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
721 my_receiver.try_put( my_body() );
725 Receiver &my_receiver;
731 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
732 enum concurrency { unlimited = 0, serial = 1 };
734 //! Constructs a graph withy no nodes.
735 graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
736 my_root_task->set_ref_count(1);
739 //! Destroys the graph.
740 /** Calls wait_for_all on the graph, deletes all of the nodes appended by calls to add, and then
741 destroys the root task of the graph. */
744 my_root_task->set_ref_count(0);
745 task::destroy( *my_root_task );
749 //! Used to register that an external entity may still interact with the graph.
750 /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
752 void increment_wait_count() {
754 my_root_task->increment_ref_count();
757 //! Deregisters an external entity that may have interacted with the graph.
758 /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
759 matches the number of increment_wait_count calls. */
760 void decrement_wait_count() {
762 my_root_task->decrement_ref_count();
765 //! Spawns a task that runs a body and puts its output to a specific receiver
766 /** The task is spawned as a child of the graph. This is useful for running tasks
767 that need to block a wait_for_all() on the graph. For example a one-off source. */
768 template< typename Receiver, typename Body >
769 void run( Receiver &r, Body body ) {
770 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
771 run_and_put_task< Receiver, Body >( r, body ) );
774 //! Spawns a task that runs a function object
775 /** The task is spawned as a child of the graph. This is useful for running tasks
776 that need to block a wait_for_all() on the graph. For example a one-off source. */
777 template< typename Body >
778 void run( Body body ) {
779 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
780 run_task< Body >( body ) );
783 //! Waits until the graph is idle and the number of decrement_wait_count calls equals the number of increment_wait_count calls.
784 /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
785 void wait_for_all() {
787 my_root_task->wait_for_all();
788 my_root_task->set_ref_count(1);
791 //! Returns the root task of the graph
806 //! Implements methods for a function node that takes a type T as input
807 template< typename Input, typename Output >
808 class function_input : public receiver<Input>, no_assign {
809 typedef sender<Input> predecessor_type;
810 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
811 enum op_type {reg_pred, rem_pred, app_body, tryput, try_fwd};
812 typedef function_input<Input, Output> my_class;
815 //! The input type of this receiver
816 typedef Input input_type;
817 //! The output type of this receiver
818 typedef Output output_type;
820 //! Constructor for function_input
821 template< typename Body >
822 function_input( graph &g, size_t max_concurrency, Body& body )
823 : my_root_task(g.root_task()), my_max_concurrency(max_concurrency), my_concurrency(internal::node_state_idle),
824 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
825 forwarder_busy(false) {
826 my_predecessors.set_owner(this);
827 my_aggregator.initialize_handler(my_handler(this));
831 virtual ~function_input() { delete my_body; }
834 virtual bool try_put( input_type t ) {
835 if ( my_max_concurrency == 0 ) {
836 spawn_body_task( t );
839 my_operation op_data(t, tryput);
840 my_aggregator.execute(&op_data);
841 return op_data.status == SUCCEEDED;
845 //! Adds src to the list of cached predecessors.
846 /* override */ bool register_predecessor( predecessor_type &src ) {
847 my_operation op_data(reg_pred);
849 my_aggregator.execute(&op_data);
853 //! Removes src from the list of cached predecessors.
854 /* override */ bool remove_predecessor( predecessor_type &src ) {
855 my_operation op_data(rem_pred);
857 my_aggregator.execute(&op_data);
863 const size_t my_max_concurrency;
864 size_t my_concurrency;
865 function_body<input_type, output_type> *my_body;
866 predecessor_cache<input_type, null_mutex > my_predecessors;
868 virtual broadcast_cache<output_type > &successors() = 0;
871 friend class apply_body_task< function_input< input_type, output_type >, input_type >;
872 friend class forward_task< function_input< input_type, output_type > >;
874 class my_operation : public aggregated_operation< my_operation > {
881 my_operation(const input_type& e, op_type t) :
882 type(char(t)), elem(const_cast<input_type*>(&e)) {}
883 my_operation(op_type t) : type(char(t)), r(NULL) {}
887 typedef internal::aggregating_functor<my_class, my_operation> my_handler;
888 friend class internal::aggregating_functor<my_class, my_operation>;
889 aggregator< my_handler, my_operation > my_aggregator;
891 void handle_operations(my_operation *op_list) {
895 op_list = op_list->next;
898 my_predecessors.add(*(tmp->r));
899 __TBB_store_with_release(tmp->status, SUCCEEDED);
900 if (!forwarder_busy) {
901 forwarder_busy = true;
902 spawn_forward_task();
906 my_predecessors.remove(*(tmp->r));
907 __TBB_store_with_release(tmp->status, SUCCEEDED);
910 __TBB_ASSERT(my_max_concurrency != 0, NULL);
912 __TBB_store_with_release(tmp->status, SUCCEEDED);
913 if (my_concurrency<my_max_concurrency) {
915 if (my_predecessors.get_item(i)) {
921 case tryput: internal_try_put(tmp); break;
922 case try_fwd: internal_forward(tmp); break;
928 void internal_try_put(my_operation *op) {
929 __TBB_ASSERT(my_max_concurrency != 0, NULL);
930 if (my_concurrency < my_max_concurrency) {
932 spawn_body_task(*(op->elem));
933 __TBB_store_with_release(op->status, SUCCEEDED);
935 __TBB_store_with_release(op->status, FAILED);
939 //! Tries to spawn bodies if available and if concurrency allows
940 void internal_forward(my_operation *op) {
941 if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
943 if (my_predecessors.get_item(i)) {
945 __TBB_store_with_release(op->status, SUCCEEDED);
950 __TBB_store_with_release(op->status, FAILED);
951 forwarder_busy = false;
954 //! Applies the body to the provided input
955 void apply_body( input_type &i ) {
956 successors().try_put( (*my_body)(i) );
957 if ( my_max_concurrency != 0 ) {
958 my_operation op_data(app_body);
959 my_aggregator.execute(&op_data);
963 //! Spawns a task that calls apply_body( input )
964 inline void spawn_body_task( input_type &input ) {
965 task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) apply_body_task<function_input<input_type, output_type>, input_type >(*this, input));
968 //! This is executed by an enqueued task, the "forwarder"
970 my_operation op_data(try_fwd);
972 op_data.status = WAIT;
973 my_aggregator.execute(&op_data);
974 } while (op_data.status == SUCCEEDED);
977 //! Spawns a task that calls forward()
978 inline void spawn_forward_task() {
979 task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) forward_task<function_input<input_type, output_type> >(*this));
983 //! Implements methods for an executable node that takes continue_msg as input
984 template< typename Output >
985 class continue_input : public continue_receiver {
988 //! The input type of this receiver
989 typedef continue_msg input_type;
991 //! The output type of this receiver
992 typedef Output output_type;
994 template< typename Body >
995 continue_input( graph &g, Body& body )
996 : my_root_task(g.root_task()),
997 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
999 template< typename Body >
1000 continue_input( graph &g, int number_of_predecessors, Body& body )
1001 : continue_receiver( number_of_predecessors ), my_root_task(g.root_task()),
1002 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
1007 function_body<input_type, output_type> *my_body;
1009 virtual broadcast_cache<output_type > &successors() = 0;
1011 friend class apply_body_task< continue_input< Output >, continue_msg >;
1013 //! Applies the body to the provided input
1014 /* override */ void apply_body( input_type ) {
1015 successors().try_put( (*my_body)( continue_msg() ) );
1018 //! Spawns a task that applies the body
1019 /* override */ void execute( ) {
1020 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
1021 apply_body_task< continue_input< Output >, continue_msg >( *this, continue_msg() ) );
1025 //! Implements methods for both executable and function nodes that puts Output to its successors
1026 template< typename Output >
1027 class function_output : public sender<Output> {
1030 typedef Output output_type;
1032 function_output() { }
1034 //! Adds a new successor to this node
1035 /* override */ bool register_successor( receiver<output_type> &r ) {
1036 successors().register_successor( r );
1040 //! Removes a successor from this node
1041 /* override */ bool remove_successor( receiver<output_type> &r ) {
1042 successors().remove_successor( r );
1048 virtual broadcast_cache<output_type > &successors() = 0;
1053 //! @endcond INTERNAL
1055 //! An executable node that acts as a source, i.e. it has no predecessors
1056 template < typename Output >
1057 class source_node : public graph_node, public sender< Output > {
1060 //! The type of the output message, which is complete
1061 typedef Output output_type;
1063 //! The type of successors of this node
1064 typedef receiver< Output > successor_type;
1066 //! Constructor for a node with a successor
1067 template< typename Body >
1068 source_node( graph &g, Body body, bool is_active = true )
1069 : my_root_task(g.root_task()), my_state( is_active ? internal::node_state_idle : internal::node_state_inactive ),
1070 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1071 my_reserved(false), my_has_cached_item(false) {
1072 my_successors.set_owner(this);
1076 ~source_node() { delete my_body; }
1078 //! Add a new successor to this node
1079 /* override */ bool register_successor( receiver<output_type> &r ) {
1080 spin_mutex::scoped_lock lock(my_mutex);
1081 my_successors.register_successor(r);
1082 if ( my_state != internal::node_state_inactive )
1087 //! Removes a successor from this node
1088 /* override */ bool remove_successor( receiver<output_type> &r ) {
1089 spin_mutex::scoped_lock lock(my_mutex);
1090 my_successors.remove_successor(r);
1094 //! Request an item from the node
1095 /*override */ bool try_get( output_type &v ) {
1096 spin_mutex::scoped_lock lock(my_mutex);
1100 if ( my_has_cached_item ) {
1102 my_has_cached_item = false;
1103 } else if ( (*my_body)(v) == false ) {
1109 //! Reserves an item.
1110 /* override */ bool try_reserve( output_type &v ) {
1111 spin_mutex::scoped_lock lock(my_mutex);
1112 if ( my_reserved ) {
1116 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
1117 my_has_cached_item = true;
1119 if ( my_has_cached_item ) {
1128 //! Release a reserved item.
1129 /** true = item has been released and so remains in sender, dest must request or reserve future items */
1130 /* override */ bool try_release( ) {
1131 spin_mutex::scoped_lock lock(my_mutex);
1132 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1133 my_reserved = false;
1138 //! Consumes a reserved item
1139 /* override */ bool try_consume( ) {
1140 spin_mutex::scoped_lock lock(my_mutex);
1141 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1142 my_reserved = false;
1143 my_has_cached_item = false;
1144 if ( !my_successors.empty() ) {
1150 //! Activates a node that was created in the inactive state
1152 spin_mutex::scoped_lock lock(my_mutex);
1153 my_state = internal::node_state_idle;
1154 if ( !my_successors.empty() )
1161 spin_mutex my_mutex;
1162 internal::node_state my_state;
1163 internal::source_body<output_type> *my_body;
1164 internal::broadcast_cache< output_type > my_successors;
1166 bool my_has_cached_item;
1167 output_type my_cached_item;
1169 friend class internal::source_task< source_node< output_type > >;
1171 //! Applies the body
1172 /* override */ void apply_body( ) {
1174 if ( try_reserve(v) == false )
1177 if ( my_successors.try_put( v ) )
1183 //! Spawns a task that applies the body
1184 /* override */ void spawn_put( ) {
1185 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
1186 internal::source_task< source_node< output_type > >( *this ) );
1191 //! Implements a function node that supports Input -> Output
1192 template <typename Input, typename Output = continue_msg >
1193 class function_node : public graph_node, public internal::function_input<Input,Output>, public internal::function_output<Output> {
1196 typedef Input input_type;
1197 typedef Output output_type;
1198 typedef sender< input_type > predecessor_type;
1199 typedef receiver< output_type > successor_type;
1202 template< typename Body >
1203 function_node( graph &g, size_t concurrency, Body body )
1204 : internal::function_input<input_type,output_type>( g, concurrency, body ) {
1205 my_successors.set_owner(this);
1210 internal::broadcast_cache<output_type> my_successors;
1211 /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
1215 //! Implements an executable node that supports continue_msg -> Output
1216 template <typename Output>
1217 class executable_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
1220 typedef continue_msg input_type;
1221 typedef Output output_type;
1222 typedef sender< input_type > predecessor_type;
1223 typedef receiver< output_type > successor_type;
1225 //! Constructor for executable node with continue_msg -> Output
1226 template <typename Body >
1227 executable_node( graph &g, Body body )
1228 : internal::continue_input<output_type>( g, body ) {
1229 my_successors.set_owner(this);
1232 //! Constructor for executable node with continue_msg -> Output
1233 template <typename Body >
1234 executable_node( graph &g, int number_of_predecessors, Body body )
1235 : internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1236 my_successors.set_owner(this);
1241 internal::broadcast_cache<output_type> my_successors;
1242 /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
1248 template< typename T >
1249 class overwrite_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
1252 typedef T input_type;
1253 typedef T output_type;
1254 typedef sender< input_type > predecessor_type;
1255 typedef receiver< output_type > successor_type;
1257 overwrite_node() : my_buffer_is_valid(false) {
1258 my_successors.set_owner( this );
1261 ~overwrite_node() {}
1263 /* override */ bool register_successor( successor_type &s ) {
1264 spin_mutex::scoped_lock l( my_mutex );
1265 if ( my_buffer_is_valid ) {
1266 // We have a valid value that must be forwarded immediately.
1267 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
1268 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
1269 my_successors.register_successor( s );
1272 // We don't add the successor: it rejected our put and we became its predecessor instead
1276 // No valid value yet, just add as successor
1277 my_successors.register_successor( s );
1282 /* override */ bool remove_successor( successor_type &s ) {
1283 spin_mutex::scoped_lock l( my_mutex );
1284 my_successors.remove_successor(s);
1288 /* override */ bool try_put( T v ) {
1289 spin_mutex::scoped_lock l( my_mutex );
1291 my_buffer_is_valid = true;
1292 my_successors.try_put(v);
1296 /* override */ bool try_get( T &v ) {
1297 spin_mutex::scoped_lock l( my_mutex );
1298 if ( my_buffer_is_valid ) {
1307 spin_mutex::scoped_lock l( my_mutex );
1308 return my_buffer_is_valid;
1312 spin_mutex::scoped_lock l( my_mutex );
1313 my_buffer_is_valid = false;
1318 spin_mutex my_mutex;
1319 internal::broadcast_cache< T, null_rw_mutex > my_successors;
1321 bool my_buffer_is_valid;
1325 template< typename T >
1326 class write_once_node : public overwrite_node<T> {
1329 typedef T input_type;
1330 typedef T output_type;
1331 typedef sender< input_type > predecessor_type;
1332 typedef receiver< output_type > successor_type;
1334 /* override */ bool try_put( T v ) {
1335 spin_mutex::scoped_lock l( this->my_mutex );
1336 if ( this->my_buffer_is_valid ) {
1339 this->my_buffer = v;
1340 this->my_buffer_is_valid = true;
1341 this->my_successors.try_put(v);
1347 //! Broadcasts completion message when it receives completion messages from all predecessors. Then resets.
1348 /** Is equivalent to an executable_node< continue_msg > with an empty_body */
1349 class continue_node : public executable_node< continue_msg > {
1352 typedef continue_msg input_type;
1353 typedef continue_msg output_type;
1354 typedef sender< input_type > predecessor_type;
1355 typedef receiver< output_type > successor_type;
1357 continue_node( graph &g ) : executable_node<continue_msg>( g, internal::empty_body< continue_msg, continue_msg>() ) {}
1360 //! Forwards messages of type T to all successors
1361 template <typename T>
1362 class broadcast_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
1364 internal::broadcast_cache<T> my_successors;
1368 typedef T input_type;
1369 typedef T output_type;
1370 typedef sender< input_type > predecessor_type;
1371 typedef receiver< output_type > successor_type;
1374 my_successors.set_owner( this );
1377 //! Adds a successor
1378 virtual bool register_successor( receiver<T> &r ) {
1379 my_successors.register_successor( r );
1383 //! Removes s as a successor
1384 virtual bool remove_successor( receiver<T> &r ) {
1385 my_successors.remove_successor( r );
1389 /* override */ bool try_put( T t ) {
1390 my_successors.try_put(t);
1396 #include "_item_buffer.h"
1398 //! Forwards messages in arbitrary order
1399 template <typename T, typename A=cache_aligned_allocator<T> >
1400 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T>, internal::no_copy {
1402 typedef T input_type;
1403 typedef T output_type;
1404 typedef sender< input_type > predecessor_type;
1405 typedef receiver< output_type > successor_type;
1406 typedef buffer_node<T, A> my_class;
1408 typedef size_t size_type;
1409 internal::round_robin_cache< T, null_rw_mutex > my_successors;
1413 friend class internal::forward_task< buffer_node< T, A > >;
1415 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
1416 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1418 // implements the aggregator_operation concept
1419 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1424 buffer_operation(const T& e, op_type t) :
1425 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
1426 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
1429 bool forwarder_busy;
1430 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
1431 friend class internal::aggregating_functor<my_class, buffer_operation>;
1432 internal::aggregator< my_handler, buffer_operation> my_aggregator;
1434 virtual void handle_operations(buffer_operation *op_list) {
1435 buffer_operation *tmp;
1436 bool try_forwarding=false;
1439 op_list = op_list->next;
1440 switch (tmp->type) {
1441 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1442 case rem_succ: internal_rem_succ(tmp); break;
1443 case req_item: internal_pop(tmp); break;
1444 case res_item: internal_reserve(tmp); break;
1445 case rel_res: internal_release(tmp); try_forwarding = true; break;
1446 case con_res: internal_consume(tmp); try_forwarding = true; break;
1447 case put_item: internal_push(tmp); try_forwarding = true; break;
1448 case try_fwd: internal_forward(tmp); break;
1451 if (try_forwarding && !forwarder_busy) {
1452 forwarder_busy = true;
1453 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
1457 //! This is executed by an enqueued task, the "forwarder"
1458 virtual void forward() {
1459 buffer_operation op_data(try_fwd);
1461 op_data.status = WAIT;
1462 my_aggregator.execute(&op_data);
1463 } while (op_data.status == SUCCEEDED);
1466 //! Register successor
1467 virtual void internal_reg_succ(buffer_operation *op) {
1468 my_successors.register_successor(*(op->r));
1469 __TBB_store_with_release(op->status, SUCCEEDED);
1472 //! Remove successor
1473 virtual void internal_rem_succ(buffer_operation *op) {
1474 my_successors.remove_successor(*(op->r));
1475 __TBB_store_with_release(op->status, SUCCEEDED);
1478 //! Tries to forward valid items to successors
1479 virtual void internal_forward(buffer_operation *op) {
1481 bool success = false; // flagged when a successor accepts
1482 size_type counter = my_successors.size();
1483 // Try forwarding, giving each successor a chance
1484 while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
1485 this->fetch_back(i_copy);
1486 if( my_successors.try_put(i_copy) ) {
1487 this->invalidate_back();
1489 success = true; // found an accepting successor
1493 if (success && !counter)
1494 __TBB_store_with_release(op->status, SUCCEEDED);
1496 __TBB_store_with_release(op->status, FAILED);
1497 forwarder_busy = false;
1501 virtual void internal_push(buffer_operation *op) {
1502 this->push_back(*(op->elem));
1503 __TBB_store_with_release(op->status, SUCCEEDED);
1506 virtual void internal_pop(buffer_operation *op) {
1507 if(this->pop_back(*(op->elem))) {
1508 __TBB_store_with_release(op->status, SUCCEEDED);
1511 __TBB_store_with_release(op->status, FAILED);
1515 virtual void internal_reserve(buffer_operation *op) {
1516 if(this->reserve_front(*(op->elem))) {
1517 __TBB_store_with_release(op->status, SUCCEEDED);
1520 __TBB_store_with_release(op->status, FAILED);
1524 virtual void internal_consume(buffer_operation *op) {
1525 this->consume_front();
1526 __TBB_store_with_release(op->status, SUCCEEDED);
1529 virtual void internal_release(buffer_operation *op) {
1530 this->release_front();
1531 __TBB_store_with_release(op->status, SUCCEEDED);
1536 buffer_node( graph &g ) : reservable_item_buffer<T>(),
1537 my_parent( g.root_task() ), forwarder_busy(false) {
1538 my_successors.set_owner(this);
1539 my_aggregator.initialize_handler(my_handler(this));
1542 virtual ~buffer_node() {}
1545 // message sender implementation
1548 //! Adds a new successor.
1549 /** Adds successor r to the list of successors; may forward tasks. */
1550 /* override */ bool register_successor( receiver<output_type> &r ) {
1551 buffer_operation op_data(reg_succ);
1553 my_aggregator.execute(&op_data);
1557 //! Removes a successor.
1558 /** Removes successor r from the list of successors.
1559 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1560 /* override */ bool remove_successor( receiver<output_type> &r ) {
1561 r.remove_predecessor(*this);
1562 buffer_operation op_data(rem_succ);
1564 my_aggregator.execute(&op_data);
1568 //! Request an item from the buffer_node
1569 /** true = v contains the returned item<BR>
1570 false = no item has been returned */
1571 /* override */ bool try_get( T &v ) {
1572 buffer_operation op_data(req_item);
1574 my_aggregator.execute(&op_data);
1575 return (op_data.status==SUCCEEDED);
1578 //! Reserves an item.
1579 /** false = no item can be reserved<BR>
1580 true = an item is reserved */
1581 /* override */ bool try_reserve( T &v ) {
1582 buffer_operation op_data(res_item);
1584 my_aggregator.execute(&op_data);
1585 return (op_data.status==SUCCEEDED);
1588 //! Release a reserved item.
1589 /** true = item has been released and so remains in sender */
1590 /* override */ bool try_release() {
1591 buffer_operation op_data(rel_res);
1592 my_aggregator.execute(&op_data);
1596 //! Consumes a reserved item.
1597 /** true = item is removed from sender and reservation removed */
1598 /* override */ bool try_consume() {
1599 buffer_operation op_data(con_res);
1600 my_aggregator.execute(&op_data);
1605 /** true is always returned */
1606 /* override */ bool try_put(T t) {
1607 buffer_operation op_data(t, put_item);
1608 my_aggregator.execute(&op_data);
1614 //! Forwards messages in FIFO order
1615 template <typename T, typename A=cache_aligned_allocator<T> >
1616 class queue_node : public buffer_node<T, A> {
1618 typedef typename buffer_node<T, A>::size_type size_type;
1619 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
1621 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1623 //! Tries to forward valid items to successors
1624 /* override */ void internal_forward(queue_operation *op) {
1626 bool success = false; // flagged when a successor accepts
1627 size_type counter = this->my_successors.size();
1628 if (this->my_reserved || !this->item_valid(this->my_head)){
1629 __TBB_store_with_release(op->status, FAILED);
1630 this->forwarder_busy = false;
1633 // Keep trying to send items while there is at least one accepting successor
1634 while (counter>0 && this->item_valid(this->my_head)) {
1635 this->fetch_front(i_copy);
1636 if(this->my_successors.try_put(i_copy)) {
1637 this->invalidate_front();
1639 success = true; // found an accepting successor
1643 if (success && !counter)
1644 __TBB_store_with_release(op->status, SUCCEEDED);
1646 __TBB_store_with_release(op->status, FAILED);
1647 this->forwarder_busy = false;
1651 /* override */ void internal_pop(queue_operation *op) {
1652 if ( this->my_reserved || !this->item_valid(this->my_head)){
1653 __TBB_store_with_release(op->status, FAILED);
1656 this->pop_front(*(op->elem));
1657 __TBB_store_with_release(op->status, SUCCEEDED);
1660 /* override */ void internal_reserve(queue_operation *op) {
1661 if (this->my_reserved || !this->item_valid(this->my_head)) {
1662 __TBB_store_with_release(op->status, FAILED);
1665 this->my_reserved = true;
1666 this->fetch_front(*(op->elem));
1667 this->invalidate_front();
1668 __TBB_store_with_release(op->status, SUCCEEDED);
1671 /* override */ void internal_consume(queue_operation *op) {
1672 this->consume_front();
1673 __TBB_store_with_release(op->status, SUCCEEDED);
1678 typedef T input_type;
1679 typedef T output_type;
1680 typedef sender< input_type > predecessor_type;
1681 typedef receiver< output_type > successor_type;
1684 queue_node( graph &g ) : buffer_node<T, A>(g) {}
1687 //! Forwards messages in sequence order
1688 template< typename T, typename A=cache_aligned_allocator<T> >
1689 class sequencer_node : public queue_node<T, A> {
1690 internal::function_body< T, size_t > *my_sequencer;
1693 typedef T input_type;
1694 typedef T output_type;
1695 typedef sender< input_type > predecessor_type;
1696 typedef receiver< output_type > successor_type;
1699 template< typename Sequencer >
1700 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
1701 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
1704 ~sequencer_node() { delete my_sequencer; }
1706 typedef typename buffer_node<T, A>::size_type size_type;
1707 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
1709 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1712 /* override */ void internal_push(sequencer_operation *op) {
1713 size_type tag = (*my_sequencer)(*(op->elem));
1715 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1717 if(this->size() > this->capacity())
1718 this->grow_my_array(this->size()); // tail already has 1 added to it
1719 this->item(tag) = std::make_pair( *(op->elem), true );
1720 __TBB_store_with_release(op->status, SUCCEEDED);
1724 //! Forwards messages in priority order
1725 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
1726 class priority_queue_node : public buffer_node<T, A> {
1728 typedef T input_type;
1729 typedef T output_type;
1730 typedef sender< input_type > predecessor_type;
1731 typedef receiver< output_type > successor_type;
1734 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
1737 typedef typename buffer_node<T, A>::size_type size_type;
1738 typedef typename buffer_node<T, A>::item_type item_type;
1739 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
1741 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1743 /* override */ void handle_operations(prio_operation *op_list) {
1744 prio_operation *tmp /*, *pop_list*/ ;
1745 bool try_forwarding=false;
1748 op_list = op_list->next;
1749 switch (tmp->type) {
1750 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
1751 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
1752 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
1753 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
1754 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
1755 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
1756 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
1757 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
1760 // process pops! for now, no special pop processing
1761 if (mark<this->my_tail) heapify();
1762 if (try_forwarding && !this->forwarder_busy) {
1763 this->forwarder_busy = true;
1764 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
1768 //! Tries to forward valid items to successors
1769 /* override */ void internal_forward(prio_operation *op) {
1771 bool success = false; // flagged when a successor accepts
1772 size_type counter = this->my_successors.size();
1774 if (this->my_reserved || this->my_tail == 0) {
1775 __TBB_store_with_release(op->status, FAILED);
1776 this->forwarder_busy = false;
1779 // Keep trying to send while there exists an accepting successor
1780 while (counter>0 && this->my_tail > 0) {
1781 i_copy = this->my_array[0].first;
1782 bool msg = this->my_successors.try_put(i_copy);
1783 if ( msg == true ) {
1784 if (mark == this->my_tail) --mark;
1786 this->my_array[0].first=this->my_array[this->my_tail].first;
1787 if (this->my_tail > 1) // don't reheap for heap of size 1
1789 success = true; // found an accepting successor
1793 if (success && !counter)
1794 __TBB_store_with_release(op->status, SUCCEEDED);
1796 __TBB_store_with_release(op->status, FAILED);
1797 this->forwarder_busy = false;
1801 /* override */ void internal_push(prio_operation *op) {
1802 if ( this->my_tail >= this->my_array_size )
1803 this->grow_my_array( this->my_tail + 1 );
1804 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
1806 __TBB_store_with_release(op->status, SUCCEEDED);
1808 /* override */ void internal_pop(prio_operation *op) {
1809 if ( this->my_reserved == true || this->my_tail == 0 ) {
1810 __TBB_store_with_release(op->status, FAILED);
1813 if (mark<this->my_tail &&
1814 compare(this->my_array[0].first,
1815 this->my_array[this->my_tail-1].first)) {
1816 // there are newly pushed elems; last one higher than top
1818 *(op->elem) = this->my_array[this->my_tail-1].first;
1820 __TBB_store_with_release(op->status, SUCCEEDED);
1822 else { // extract and push the last element down heap
1823 *(op->elem) = this->my_array[0].first; // copy the data
1824 if (mark == this->my_tail) --mark;
1826 __TBB_store_with_release(op->status, SUCCEEDED);
1827 this->my_array[0].first=this->my_array[this->my_tail].first;
1828 if (this->my_tail > 1) // don't reheap for heap of size 1
1833 /* override */ void internal_reserve(prio_operation *op) {
1834 if (this->my_reserved == true || this->my_tail == 0) {
1835 __TBB_store_with_release(op->status, FAILED);
1838 this->my_reserved = true;
1839 *(op->elem) = reserved_item = this->my_array[0].first;
1840 if (mark == this->my_tail) --mark;
1842 __TBB_store_with_release(op->status, SUCCEEDED);
1843 this->my_array[0].first = this->my_array[this->my_tail].first;
1844 if (this->my_tail > 1) // don't reheap for heap of size 1
1848 /* override */ void internal_consume(prio_operation *op) {
1849 this->my_reserved = false;
1850 __TBB_store_with_release(op->status, SUCCEEDED);
1852 /* override */ void internal_release(prio_operation *op) {
1853 if (this->my_tail >= this->my_array_size)
1854 this->grow_my_array( this->my_tail + 1 );
1855 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
1857 this->my_reserved = false;
1858 __TBB_store_with_release(op->status, SUCCEEDED);
1864 input_type reserved_item;
1867 if (!mark) mark = 1;
1868 for (; mark<this->my_tail; ++mark) { // for each unheaped element
1869 size_type cur_pos = mark;
1870 input_type to_place = this->my_array[mark].first;
1871 do { // push to_place up the heap
1872 size_type parent = (cur_pos-1)>>1;
1873 if (!compare(this->my_array[parent].first, to_place))
1875 this->my_array[cur_pos].first = this->my_array[parent].first;
1878 this->my_array[cur_pos].first = to_place;
1883 size_type cur_pos=0, child=1;
1884 while (child < mark) {
1885 size_type target = child;
1887 compare(this->my_array[child].first,
1888 this->my_array[child+1].first))
1890 // target now has the higher priority child
1891 if (compare(this->my_array[target].first,
1892 this->my_array[this->my_tail].first))
1894 this->my_array[cur_pos].first = this->my_array[target].first;
1896 child = (cur_pos<<1)+1;
1898 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
1902 //! Forwards messages only if the threshold has not been reached
1903 /** This node forwards items until its threshold is reached.
1904 It contains no buffering. If the downstream node rejects, the
1905 message is dropped. */
1906 template< typename T >
1907 class limiter_node : public graph_node, public receiver< T >, public sender< T >, internal::no_copy {
1910 typedef T input_type;
1911 typedef T output_type;
1912 typedef sender< input_type > predecessor_type;
1913 typedef receiver< output_type > successor_type;
1918 size_t my_threshold;
1920 internal::predecessor_cache< T > my_predecessors;
1921 spin_mutex my_mutex;
1922 internal::broadcast_cache< T > my_successors;
1924 friend class internal::forward_task< limiter_node<T> >;
1926 // Let decrementer call decrement_counter()
1927 friend class internal::decrementer< limiter_node<T> >;
1929 void decrement_counter() {
1932 // If we can't get / put an item immediately then drop the count
1933 if ( my_predecessors.get_item( v ) == false
1934 || my_successors.try_put(v) == false ) {
1935 spin_mutex::scoped_lock lock(my_mutex);
1937 if ( !my_predecessors.empty() )
1938 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
1939 internal::forward_task< limiter_node<T> >( *this ) );
1945 spin_mutex::scoped_lock lock(my_mutex);
1946 if ( my_count < my_threshold )
1951 decrement_counter();
1956 //! The internal receiver< continue_msg > that decrements the count
1957 internal::decrementer< limiter_node<T> > decrement;
1960 limiter_node( graph &g, size_t threshold, int number_of_decrement_predecessors = 0 ) :
1961 my_root_task(g.root_task()), my_threshold(threshold), my_count(0), decrement(number_of_decrement_predecessors) {
1962 my_predecessors.set_owner(this);
1963 my_successors.set_owner(this);
1964 decrement.set_owner(this);
1967 //! Replace the current successor with this new successor
1968 /* override */ bool register_successor( receiver<output_type> &r ) {
1969 my_successors.register_successor(r);
1973 //! Removes a successor from this node
1974 /** r.remove_predecessor(*this) is also called. */
1975 /* override */ bool remove_successor( receiver<output_type> &r ) {
1976 r.remove_predecessor(*this);
1977 my_successors.remove_successor(r);
1981 //! Puts an item to this receiver
1982 /* override */ bool try_put( T t ) {
1984 spin_mutex::scoped_lock lock(my_mutex);
1985 if ( my_count >= my_threshold )
1991 bool msg = my_successors.try_put(t);
1993 if ( msg != true ) {
1994 spin_mutex::scoped_lock lock(my_mutex);
1996 if ( !my_predecessors.empty() )
1997 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
1998 internal::forward_task< limiter_node<T> >( *this ) );
2004 //! Removes src from the list of cached predecessors.
2005 /* override */ bool register_predecessor( predecessor_type &src ) {
2006 spin_mutex::scoped_lock lock(my_mutex);
2007 my_predecessors.add( src );
2008 if ( my_count < my_threshold && !my_successors.empty() )
2009 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
2010 internal::forward_task< limiter_node<T> >( *this ) );
2014 //! Removes src from the list of cached predecessors.
2015 /* override */ bool remove_predecessor( predecessor_type &src ) {
2016 my_predecessors.remove( src );
2022 namespace internal {
2024 struct forwarding_base {
2025 forwarding_base(task *rt) : my_root_task(rt) {}
2026 virtual ~forwarding_base() {}
2027 virtual void decrement_port_count() = 0;
2028 virtual void increment_port_count() = 0;
2029 // moved here so input ports can queue tasks
2034 struct join_helper {
2036 template< typename TupleType, typename PortType >
2037 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
2038 std::get<N-1>( my_input ).set_join_node_pointer(port);
2039 join_helper<N-1>::set_join_node_pointer( my_input, port );
2041 template< typename TupleType >
2042 static inline void consume_reservations( TupleType &my_input ) {
2043 std::get<N-1>( my_input ).consume();
2044 join_helper<N-1>::consume_reservations( my_input );
2047 template< typename TupleType >
2048 static inline void release_my_reservation( TupleType &my_input ) {
2049 std::get<N-1>( my_input ).release();
2052 template <typename TupleType>
2053 static inline void release_reservations( TupleType &my_input) {
2054 join_helper<N-1>::release_reservations(my_input);
2055 release_my_reservation(my_input);
2058 template< typename InputTuple, typename OutputTuple >
2059 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
2060 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
2061 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
2062 release_my_reservation( my_input );
2068 template<typename InputTuple, typename OutputTuple>
2069 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
2070 bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail
2071 return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
2074 template<typename InputTuple, typename OutputTuple>
2075 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
2076 return get_my_item(my_input, out);
2079 template<typename InputTuple>
2080 static inline void reset_my_port(InputTuple &my_input) {
2081 join_helper<N-1>::reset_my_port(my_input);
2082 std::get<N-1>(my_input).reset_port();
2085 template<typename InputTuple>
2086 static inline void reset_ports(InputTuple& my_input) {
2087 reset_my_port(my_input);
2092 struct join_helper<1> {
2094 template< typename TupleType, typename PortType >
2095 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
2096 std::get<0>( my_input ).set_join_node_pointer(port);
2099 template< typename TupleType >
2100 static inline void consume_reservations( TupleType &my_input ) {
2101 std::get<0>( my_input ).consume();
2104 template< typename TupleType >
2105 static inline void release_my_reservation( TupleType &my_input ) {
2106 std::get<0>( my_input ).release();
2109 template<typename TupleType>
2110 static inline void release_reservations( TupleType &my_input) {
2111 release_my_reservation(my_input);
2114 template< typename InputTuple, typename OutputTuple >
2115 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
2116 return std::get<0>( my_input ).reserve( std::get<0>( out ) );
2119 template<typename InputTuple, typename OutputTuple>
2120 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
2121 return std::get<0>(my_input).get_item(std::get<0>(out));
2124 template<typename InputTuple, typename OutputTuple>
2125 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
2126 return get_my_item(my_input, out);
2129 template<typename InputTuple>
2130 static inline void reset_my_port(InputTuple &my_input) {
2131 std::get<0>(my_input).reset_port();
2134 template<typename InputTuple>
2135 static inline void reset_ports(InputTuple& my_input) {
2136 reset_my_port(my_input);
2140 namespace join_policy_namespace {
2141 enum join_policy { reserving
2145 using namespace join_policy_namespace;
2147 //! The two-phase join port
2148 template< typename T >
2149 class reserving_port : public receiver<T> {
2151 typedef T input_type;
2152 typedef sender<T> predecessor_type;
2154 // ----------- Aggregator ------------
2155 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res };
2156 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2157 typedef reserving_port<T> my_class;
2159 class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
2164 predecessor_type *my_pred;
2166 reserving_port_operation(const T& e, op_type t) :
2167 type(char(t)), my_arg(const_cast<T*>(&e)) {}
2168 reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
2169 my_pred(const_cast<predecessor_type *>(&s)) {}
2170 reserving_port_operation(op_type t) : type(char(t)) {}
2173 typedef internal::aggregating_functor<my_class, reserving_port_operation> my_handler;
2174 friend class internal::aggregating_functor<my_class, reserving_port_operation>;
2175 aggregator<my_handler, reserving_port_operation> my_aggregator;
2177 void handle_operations(reserving_port_operation* op_list) {
2178 reserving_port_operation *current;
2179 bool no_predecessors;
2182 op_list = op_list->next;
2183 switch(current->type) {
2185 no_predecessors = my_predecessors.empty();
2186 my_predecessors.add(*(current->my_pred));
2187 if ( no_predecessors ) {
2188 my_join->decrement_port_count( ); // may try to forward
2190 __TBB_store_with_release(current->status, SUCCEEDED);
2193 my_predecessors.remove(*(current->my_pred));
2194 if(my_predecessors.empty()) my_join->increment_port_count();
2195 __TBB_store_with_release(current->status, SUCCEEDED);
2199 __TBB_store_with_release(current->status, FAILED);
2201 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
2203 __TBB_store_with_release(current->status, SUCCEEDED);
2205 if ( my_predecessors.empty() ) {
2206 my_join->increment_port_count();
2208 __TBB_store_with_release(current->status, FAILED);
2213 my_predecessors.try_release( );
2214 __TBB_store_with_release(current->status, SUCCEEDED);
2218 my_predecessors.try_consume( );
2219 __TBB_store_with_release(current->status, SUCCEEDED);
2228 reserving_port() : reserved(false) {
2230 my_predecessors.set_owner( this );
2231 my_aggregator.initialize_handler(my_handler(this));
2235 reserving_port(const reserving_port& /* other */) : receiver<T>() {
2238 my_predecessors.set_owner( this );
2239 my_aggregator.initialize_handler(my_handler(this));
2242 void set_join_node_pointer(forwarding_base *join) {
2246 // always rejects, so arc is reversed (and reserves can be done.)
2251 //! Add a predecessor
2252 bool register_predecessor( sender<T> &src ) {
2253 reserving_port_operation op_data(src, reg_pred);
2254 my_aggregator.execute(&op_data);
2255 return op_data.status == SUCCEEDED;
2258 //! Remove a predecessor
2259 bool remove_predecessor( sender<T> &src ) {
2260 reserving_port_operation op_data(src, rem_pred);
2261 my_aggregator.execute(&op_data);
2262 return op_data.status == SUCCEEDED;
2265 //! Reserve an item from the port
2266 bool reserve( T &v ) {
2267 reserving_port_operation op_data(v, res_item);
2268 my_aggregator.execute(&op_data);
2269 return op_data.status == SUCCEEDED;
2272 //! Release the port
2274 reserving_port_operation op_data(rel_res);
2275 my_aggregator.execute(&op_data);
2278 //! Complete use of the port
2280 reserving_port_operation op_data(con_res);
2281 my_aggregator.execute(&op_data);
2285 forwarding_base *my_join;
2286 reservable_predecessor_cache< T, null_mutex > my_predecessors;
2290 //! queueing join_port
2291 template<typename T>
2292 class queueing_port : public receiver<T>, public item_buffer<T> {
2294 typedef T input_type;
2295 typedef sender<T> predecessor_type;
2296 typedef queueing_port<T> my_node_type;
2298 // ----------- Aggregator ------------
2300 enum op_type { try__put, get__item, res_port };
2301 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2302 typedef queueing_port<T> my_class;
2304 class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
2311 // constructor for value parameter
2312 queueing_port_operation(const T& e, op_type t) :
2313 // type(char(t)), my_val(const_cast<T>(e)) {}
2314 type(char(t)), my_val(e) {}
2315 // constructor for pointer parameter
2316 queueing_port_operation(const T* p, op_type t) :
2317 type(char(t)), my_arg(const_cast<T*>(p)) {}
2318 // constructor with no parameter
2319 queueing_port_operation(op_type t) : type(char(t)) {}
2322 typedef internal::aggregating_functor<my_class, queueing_port_operation> my_handler;
2323 friend class internal::aggregating_functor<my_class, queueing_port_operation>;
2324 aggregator<my_handler, queueing_port_operation> my_aggregator;
2326 void handle_operations(queueing_port_operation* op_list) {
2327 queueing_port_operation *current;
2331 op_list = op_list->next;
2332 switch(current->type) {
2334 was_empty = this->buffer_empty();
2335 this->push_back(current->my_val);
2336 if (was_empty) my_join->decrement_port_count();
2337 __TBB_store_with_release(current->status, SUCCEEDED);
2340 if(!this->buffer_empty()) {
2341 this->fetch_front(*(current->my_arg));
2342 __TBB_store_with_release(current->status, SUCCEEDED);
2345 __TBB_store_with_release(current->status, FAILED);
2349 __TBB_ASSERT(this->item_valid(this->my_head), "No item to reset");
2350 this->invalidate_front(); ++(this->my_head);
2351 if(this->item_valid(this->my_head)) {
2352 my_join->decrement_port_count();
2354 __TBB_store_with_release(current->status, SUCCEEDED);
2359 // ------------ End Aggregator ---------------
2363 queueing_port() : item_buffer<T>() {
2365 my_aggregator.initialize_handler(my_handler(this));
2368 //! copy constructor
2369 queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
2371 my_aggregator.initialize_handler(my_handler(this));
2374 //! record parent for tallying available items
2375 void set_join_node_pointer(forwarding_base *join) {
2379 /*override*/bool try_put(T v) {
2380 queueing_port_operation op_data(v, try__put);
2381 my_aggregator.execute(&op_data);
2382 return op_data.status == SUCCEEDED;
2386 bool get_item( T &v ) {
2387 queueing_port_operation op_data(&v, get__item);
2388 my_aggregator.execute(&op_data);
2389 return op_data.status == SUCCEEDED;
2392 // reset_port is called when item is accepted by successor, but
2393 // is initiated by join_node.
2395 queueing_port_operation op_data(res_port);
2396 my_aggregator.execute(&op_data);
2401 forwarding_base *my_join;
2404 template<join_policy JP, typename InputTuple, typename OutputTuple>
2405 class join_node_base;
2407 //! join_node_FE : implements input port policy
2408 template<join_policy JP, typename InputTuple, typename OutputTuple>
2411 template<typename InputTuple, typename OutputTuple>
2412 class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
2414 static const int N = std::tuple_size<OutputTuple>::value;
2415 typedef OutputTuple output_type;
2416 typedef InputTuple input_type;
2417 typedef join_node_base<reserving, InputTuple, OutputTuple> my_node_type; // for forwarding
2419 join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NULL) {
2420 ports_with_no_inputs = N;
2421 join_helper<N>::set_join_node_pointer(my_inputs, this);
2424 void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
2426 void increment_port_count() {
2427 ++ports_with_no_inputs;
2430 // if all input_ports have predecessors, spawn forward to try and consume tuples
2431 void decrement_port_count() {
2432 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
2433 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
2434 forward_task<my_node_type>(*my_node) );
2438 input_type &inputs() { return my_inputs; }
2440 // all methods on input ports should be called under mutual exclusion from join_node_base.
2442 bool tuple_build_may_succeed() {
2443 return !ports_with_no_inputs;
2446 bool try_to_make_tuple(output_type &out) {
2447 if(ports_with_no_inputs) return false;
2448 return join_helper<N>::reserve(my_inputs, out);
2451 void tuple_accepted() {
2452 join_helper<N>::consume_reservations(my_inputs);
2454 void tuple_rejected() {
2455 join_helper<N>::release_reservations(my_inputs);
2458 input_type my_inputs;
2459 my_node_type *my_node;
2460 atomic<size_t> ports_with_no_inputs;
2463 template<typename InputTuple, typename OutputTuple>
2464 class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
2466 static const int N = std::tuple_size<OutputTuple>::value;
2467 typedef OutputTuple output_type;
2468 typedef InputTuple input_type;
2469 typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_type; // for forwarding
2471 join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NULL) {
2472 ports_with_no_items = N;
2473 join_helper<N>::set_join_node_pointer(my_inputs, this);
2476 // needed for forwarding
2477 void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
2479 void reset_port_count() {
2480 ports_with_no_items = N;
2483 // if all input_ports have items, spawn forward to try and consume tuples
2484 void decrement_port_count() {
2485 if(ports_with_no_items.fetch_and_decrement() == 1) {
2486 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
2487 forward_task<my_node_type>(*my_node) );
2491 void increment_port_count() { __TBB_ASSERT(false, NULL); } // should never be called
2493 input_type &inputs() { return my_inputs; }
2495 // all methods on input ports should be called under mutual exclusion from join_node_base.
2497 bool tuple_build_may_succeed() {
2498 return !ports_with_no_items;
2501 bool try_to_make_tuple(output_type &out) {
2502 if(ports_with_no_items) return false;
2503 return join_helper<N>::get_items(my_inputs, out);
2506 void tuple_accepted() {
2508 join_helper<N>::reset_ports(my_inputs);
2510 void tuple_rejected() {
2514 input_type my_inputs;
2515 my_node_type *my_node;
2516 atomic<size_t> ports_with_no_items;
2520 template<join_policy JP, typename InputTuple, typename OutputTuple>
2521 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
2522 public sender<OutputTuple>, no_copy {
2524 typedef OutputTuple output_type;
2526 typedef receiver<output_type> successor_type;
2527 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
2528 using input_ports_type::tuple_build_may_succeed;
2529 using input_ports_type::try_to_make_tuple;
2530 using input_ports_type::tuple_accepted;
2531 using input_ports_type::tuple_rejected;
2534 // ----------- Aggregator ------------
2535 enum op_type { reg_succ, rem_succ, try__get, do_fwrd };
2536 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2537 typedef join_node_base<JP,InputTuple,OutputTuple> my_class;
2539 class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
2543 output_type *my_arg;
2544 successor_type *my_succ;
2546 join_node_base_operation(const output_type& e, op_type t) :
2547 type(char(t)), my_arg(const_cast<output_type*>(&e)) {}
2548 join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
2549 my_succ(const_cast<successor_type *>(&s)) {}
2550 join_node_base_operation(op_type t) : type(char(t)) {}
2553 typedef internal::aggregating_functor<my_class, join_node_base_operation> my_handler;
2554 friend class internal::aggregating_functor<my_class, join_node_base_operation>;
2555 bool forwarder_busy;
2556 aggregator<my_handler, join_node_base_operation> my_aggregator;
2558 void handle_operations(join_node_base_operation* op_list) {
2559 join_node_base_operation *current;
2562 op_list = op_list->next;
2563 switch(current->type) {
2565 my_successors.register_successor(*(current->my_succ));
2566 if(tuple_build_may_succeed() && !forwarder_busy) {
2567 task::enqueue( * new ( task::allocate_additional_child_of(*(this->my_root_task)) )
2568 forward_task<join_node_base<JP,InputTuple,OutputTuple> >(*this));
2569 forwarder_busy = true;
2571 __TBB_store_with_release(current->status, SUCCEEDED);
2574 my_successors.remove_successor(*(current->my_succ));
2575 __TBB_store_with_release(current->status, SUCCEEDED);
2578 if(tuple_build_may_succeed()) {
2579 if(try_to_make_tuple(*(current->my_arg))) {
2581 __TBB_store_with_release(current->status, SUCCEEDED);
2583 else __TBB_store_with_release(current->status, FAILED);
2585 else __TBB_store_with_release(current->status, FAILED);
2588 bool build_succeeded;
2590 if(tuple_build_may_succeed()) {
2592 build_succeeded = try_to_make_tuple(out);
2593 if(build_succeeded) {
2594 if(my_successors.try_put(out)) {
2599 build_succeeded = false;
2602 } while(build_succeeded);
2604 __TBB_store_with_release(current->status, SUCCEEDED);
2605 forwarder_busy = false;
2611 // ---------- end aggregator -----------
2614 join_node_base(graph &g) : input_ports_type(g), forwarder_busy(false) {
2615 my_successors.set_owner(this);
2616 input_ports_type::set_my_node(this);
2617 my_aggregator.initialize_handler(my_handler(this));
2620 bool register_successor(successor_type &r) {
2621 join_node_base_operation op_data(r, reg_succ);
2622 my_aggregator.execute(&op_data);
2623 return op_data.status == SUCCEEDED;
2626 bool remove_successor( successor_type &r) {
2627 join_node_base_operation op_data(r, rem_succ);
2628 my_aggregator.execute(&op_data);
2629 return op_data.status == SUCCEEDED;
2632 bool try_get( output_type &v) {
2633 join_node_base_operation op_data(v, try__get);
2634 my_aggregator.execute(&op_data);
2635 return op_data.status == SUCCEEDED;
2639 broadcast_cache<output_type, null_rw_mutex> my_successors;
2641 friend class forward_task< join_node_base<JP, InputTuple, OutputTuple> >;
2644 join_node_base_operation op_data(do_fwrd);
2645 my_aggregator.execute(&op_data);
2649 //! unfolded_join_node : passes input_ports_tuple_type to join_node_base. We build the input port type
2650 // using tuple_element.
2651 template<int N, typename OutputTuple, join_policy JP>
2652 class unfolded_join_node;
2654 template<typename OutputTuple>
2655 class unfolded_join_node<2,OutputTuple,reserving> : public internal::join_node_base<reserving,
2657 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2658 reserving_port<typename std::tuple_element<1,OutputTuple>::type> >,
2663 typedef typename std::tuple<
2664 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2665 reserving_port<typename std::tuple_element<1,OutputTuple>::type> > input_ports_tuple_type;
2666 typedef OutputTuple output_type;
2668 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2670 unfolded_join_node(graph &g) : base_type(g) {}
2673 template<typename OutputTuple>
2674 class unfolded_join_node<3,OutputTuple,reserving> : public internal::join_node_base<reserving,
2676 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2677 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2678 reserving_port<typename std::tuple_element<2,OutputTuple>::type> >,
2683 typedef typename std::tuple<
2684 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2685 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2686 reserving_port<typename std::tuple_element<2,OutputTuple>::type> > input_ports_tuple_type;
2687 typedef OutputTuple output_type;
2689 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2691 unfolded_join_node(graph &g) : base_type(g) {}
2694 template<typename OutputTuple>
2695 class unfolded_join_node<4,OutputTuple,reserving> : public internal::join_node_base<reserving,
2697 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2698 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2699 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2700 reserving_port<typename std::tuple_element<3,OutputTuple>::type> >,
2704 typedef typename std::tuple<
2705 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2706 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2707 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2708 reserving_port<typename std::tuple_element<3,OutputTuple>::type> > input_ports_tuple_type;
2709 typedef OutputTuple output_type;
2711 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2713 unfolded_join_node(graph &g) : base_type(g) {}
2716 template<typename OutputTuple>
2717 class unfolded_join_node<5,OutputTuple,reserving> : public internal::join_node_base<reserving,
2719 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2720 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2721 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2722 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2723 reserving_port<typename std::tuple_element<4,OutputTuple>::type> >,
2727 typedef typename std::tuple<
2728 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2729 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2730 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2731 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2732 reserving_port<typename std::tuple_element<4,OutputTuple>::type> > input_ports_tuple_type;
2733 typedef OutputTuple output_type;
2735 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2737 unfolded_join_node(graph &g) : base_type(g) {}
2740 template<typename OutputTuple>
2741 class unfolded_join_node<6,OutputTuple,reserving> : public internal::join_node_base<reserving,
2743 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2744 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2745 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2746 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2747 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2748 reserving_port<typename std::tuple_element<5,OutputTuple>::type> >,
2752 typedef typename std::tuple<
2753 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2754 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2755 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2756 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2757 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2758 reserving_port<typename std::tuple_element<5,OutputTuple>::type> > input_ports_tuple_type;
2759 typedef OutputTuple output_type;
2761 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2763 unfolded_join_node(graph &g) : base_type(g) {}
2766 template<typename OutputTuple>
2767 class unfolded_join_node<7,OutputTuple,reserving> : public internal::join_node_base<reserving,
2769 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2770 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2771 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2772 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2773 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2774 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2775 reserving_port<typename std::tuple_element<6,OutputTuple>::type> >,
2779 typedef typename std::tuple<
2780 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2781 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2782 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2783 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2784 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2785 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2786 reserving_port<typename std::tuple_element<6,OutputTuple>::type> > input_ports_tuple_type;
2787 typedef OutputTuple output_type;
2789 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2791 unfolded_join_node(graph &g) : base_type(g) {}
2794 template<typename OutputTuple>
2795 class unfolded_join_node<8,OutputTuple,reserving> : public internal::join_node_base<reserving,
2797 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2798 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2799 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2800 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2801 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2802 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2803 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2804 reserving_port<typename std::tuple_element<7,OutputTuple>::type> >,
2808 typedef typename std::tuple<
2809 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2810 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2811 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2812 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2813 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2814 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2815 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2816 reserving_port<typename std::tuple_element<7,OutputTuple>::type> > input_ports_tuple_type;
2817 typedef OutputTuple output_type;
2819 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2821 unfolded_join_node(graph &g) : base_type(g) {}
2824 template<typename OutputTuple>
2825 class unfolded_join_node<9,OutputTuple,reserving> : public internal::join_node_base<reserving,
2827 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2828 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2829 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2830 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2831 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2832 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2833 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2834 reserving_port<typename std::tuple_element<7,OutputTuple>::type>,
2835 reserving_port<typename std::tuple_element<8,OutputTuple>::type> >,
2839 typedef typename std::tuple<
2840 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2841 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2842 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2843 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2844 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2845 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2846 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2847 reserving_port<typename std::tuple_element<7,OutputTuple>::type>,
2848 reserving_port<typename std::tuple_element<8,OutputTuple>::type> > input_ports_tuple_type;
2849 typedef OutputTuple output_type;
2851 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2853 unfolded_join_node(graph &g) : base_type(g) {}
2856 template<typename OutputTuple>
2857 class unfolded_join_node<10,OutputTuple,reserving> : public internal::join_node_base<reserving,
2859 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2860 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2861 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2862 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2863 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2864 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2865 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2866 reserving_port<typename std::tuple_element<7,OutputTuple>::type>,
2867 reserving_port<typename std::tuple_element<8,OutputTuple>::type>,
2868 reserving_port<typename std::tuple_element<9,OutputTuple>::type> >,
2872 typedef typename std::tuple<
2873 reserving_port<typename std::tuple_element<0,OutputTuple>::type>,
2874 reserving_port<typename std::tuple_element<1,OutputTuple>::type>,
2875 reserving_port<typename std::tuple_element<2,OutputTuple>::type>,
2876 reserving_port<typename std::tuple_element<3,OutputTuple>::type>,
2877 reserving_port<typename std::tuple_element<4,OutputTuple>::type>,
2878 reserving_port<typename std::tuple_element<5,OutputTuple>::type>,
2879 reserving_port<typename std::tuple_element<6,OutputTuple>::type>,
2880 reserving_port<typename std::tuple_element<7,OutputTuple>::type>,
2881 reserving_port<typename std::tuple_element<8,OutputTuple>::type>,
2882 reserving_port<typename std::tuple_element<9,OutputTuple>::type> > input_ports_tuple_type;
2883 typedef OutputTuple output_type;
2885 typedef join_node_base<reserving, input_ports_tuple_type, output_type > base_type;
2887 unfolded_join_node(graph &g) : base_type(g) {}
2890 template<typename OutputTuple>
2891 class unfolded_join_node<2,OutputTuple,queueing> : public internal::join_node_base<queueing,
2893 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2894 queueing_port<typename std::tuple_element<1,OutputTuple>::type> >,
2899 typedef typename std::tuple<
2900 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2901 queueing_port<typename std::tuple_element<1,OutputTuple>::type> > input_ports_tuple_type;
2902 typedef OutputTuple output_type;
2904 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2906 unfolded_join_node(graph &g) : base_type(g) {}
2909 template<typename OutputTuple>
2910 class unfolded_join_node<3,OutputTuple,queueing> : public internal::join_node_base<queueing,
2912 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2913 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2914 queueing_port<typename std::tuple_element<2,OutputTuple>::type> >,
2919 typedef typename std::tuple<
2920 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2921 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2922 queueing_port<typename std::tuple_element<2,OutputTuple>::type> > input_ports_tuple_type;
2923 typedef OutputTuple output_type;
2925 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2927 unfolded_join_node(graph &g) : base_type(g) {}
2930 template<typename OutputTuple>
2931 class unfolded_join_node<4,OutputTuple,queueing> : public internal::join_node_base<queueing,
2933 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2934 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2935 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2936 queueing_port<typename std::tuple_element<3,OutputTuple>::type> >,
2940 typedef typename std::tuple<
2941 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2942 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2943 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2944 queueing_port<typename std::tuple_element<3,OutputTuple>::type> > input_ports_tuple_type;
2945 typedef OutputTuple output_type;
2947 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2949 unfolded_join_node(graph &g) : base_type(g) {}
2952 template<typename OutputTuple>
2953 class unfolded_join_node<5,OutputTuple,queueing> : public internal::join_node_base<queueing,
2955 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2956 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2957 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2958 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
2959 queueing_port<typename std::tuple_element<4,OutputTuple>::type> >,
2963 typedef typename std::tuple<
2964 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2965 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2966 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2967 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
2968 queueing_port<typename std::tuple_element<4,OutputTuple>::type> > input_ports_tuple_type;
2969 typedef OutputTuple output_type;
2971 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2973 unfolded_join_node(graph &g) : base_type(g) {}
2976 template<typename OutputTuple>
2977 class unfolded_join_node<6,OutputTuple,queueing> : public internal::join_node_base<queueing,
2979 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2980 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2981 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2982 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
2983 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
2984 queueing_port<typename std::tuple_element<5,OutputTuple>::type> >,
2988 typedef typename std::tuple<
2989 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
2990 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
2991 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
2992 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
2993 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
2994 queueing_port<typename std::tuple_element<5,OutputTuple>::type> > input_ports_tuple_type;
2995 typedef OutputTuple output_type;
2997 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
2999 unfolded_join_node(graph &g) : base_type(g) {}
3002 template<typename OutputTuple>
3003 class unfolded_join_node<7,OutputTuple,queueing> : public internal::join_node_base<queueing,
3005 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3006 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3007 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3008 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3009 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3010 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3011 queueing_port<typename std::tuple_element<6,OutputTuple>::type> >,
3015 typedef typename std::tuple<
3016 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3017 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3018 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3019 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3020 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3021 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3022 queueing_port<typename std::tuple_element<6,OutputTuple>::type> > input_ports_tuple_type;
3023 typedef OutputTuple output_type;
3025 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3027 unfolded_join_node(graph &g) : base_type(g) {}
3030 template<typename OutputTuple>
3031 class unfolded_join_node<8,OutputTuple,queueing> : public internal::join_node_base<queueing,
3033 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3034 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3035 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3036 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3037 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3038 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3039 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3040 queueing_port<typename std::tuple_element<7,OutputTuple>::type> >,
3044 typedef typename std::tuple<
3045 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3046 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3047 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3048 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3049 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3050 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3051 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3052 queueing_port<typename std::tuple_element<7,OutputTuple>::type> > input_ports_tuple_type;
3053 typedef OutputTuple output_type;
3055 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3057 unfolded_join_node(graph &g) : base_type(g) {}
3060 template<typename OutputTuple>
3061 class unfolded_join_node<9,OutputTuple,queueing> : public internal::join_node_base<queueing,
3063 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3064 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3065 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3066 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3067 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3068 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3069 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3070 queueing_port<typename std::tuple_element<7,OutputTuple>::type>,
3071 queueing_port<typename std::tuple_element<8,OutputTuple>::type> >,
3075 typedef typename std::tuple<
3076 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3077 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3078 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3079 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3080 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3081 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3082 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3083 queueing_port<typename std::tuple_element<7,OutputTuple>::type>,
3084 queueing_port<typename std::tuple_element<8,OutputTuple>::type> > input_ports_tuple_type;
3085 typedef OutputTuple output_type;
3087 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3089 unfolded_join_node(graph &g) : base_type(g) {}
3092 template<typename OutputTuple>
3093 class unfolded_join_node<10,OutputTuple,queueing> : public internal::join_node_base<queueing,
3095 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3096 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3097 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3098 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3099 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3100 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3101 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3102 queueing_port<typename std::tuple_element<7,OutputTuple>::type>,
3103 queueing_port<typename std::tuple_element<8,OutputTuple>::type>,
3104 queueing_port<typename std::tuple_element<9,OutputTuple>::type> >,
3108 typedef typename std::tuple<
3109 queueing_port<typename std::tuple_element<0,OutputTuple>::type>,
3110 queueing_port<typename std::tuple_element<1,OutputTuple>::type>,
3111 queueing_port<typename std::tuple_element<2,OutputTuple>::type>,
3112 queueing_port<typename std::tuple_element<3,OutputTuple>::type>,
3113 queueing_port<typename std::tuple_element<4,OutputTuple>::type>,
3114 queueing_port<typename std::tuple_element<5,OutputTuple>::type>,
3115 queueing_port<typename std::tuple_element<6,OutputTuple>::type>,
3116 queueing_port<typename std::tuple_element<7,OutputTuple>::type>,
3117 queueing_port<typename std::tuple_element<8,OutputTuple>::type>,
3118 queueing_port<typename std::tuple_element<9,OutputTuple>::type> > input_ports_tuple_type;
3119 typedef OutputTuple output_type;
3121 typedef join_node_base<queueing, input_ports_tuple_type, output_type > base_type;
3123 unfolded_join_node(graph &g) : base_type(g) {}
3126 //! templated function to refer to input ports of the join node
3127 template<size_t N, typename JNT>
3128 typename std::tuple_element<N, typename JNT::input_ports_tuple_type>::type &input_port(JNT &jn) {
3129 return std::get<N>(jn.inputs());
3132 } // namespace internal
3134 using namespace internal::join_policy_namespace;
3135 using internal::input_port;
3137 template<typename OutputTuple, join_policy JP=reserving>
3138 class join_node: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, OutputTuple, JP> {
3140 static const int N = std::tuple_size<OutputTuple>::value;
3141 typedef typename internal::unfolded_join_node<N, OutputTuple, JP> unfolded_type;
3143 typedef OutputTuple output_type;
3144 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
3145 join_node(graph &g) : unfolded_type(g) { }
3152 //! Makes an edge between a single predecessor and a single successor
3153 template< typename T >
3154 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3155 p.register_successor( s );
3158 //! Makes edges between a single predecessor and multiple successors
3159 template< typename T, typename SIterator >
3160 inline void make_edges( sender<T> &p, SIterator s_begin, SIterator s_end ) {
3161 for ( SIterator i = s_begin; i != s_end; ++i ) {
3162 make_edge( p, **i );
3166 //! Makes edges between a set of predecessors and a single successor
3167 template< typename T, typename PIterator >
3168 inline void make_edges( PIterator p_begin, PIterator p_end, receiver<T> &s ) {
3169 for ( PIterator i = p_begin; i != p_end; ++i ) {
3170 make_edge( **i, s );