2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
32 #if !TBB_PREVIEW_GRAPH
33 #error Set TBB_PREVIEW_GRAPH to include graph.h
36 #include "tbb_stddef.h"
38 #include "spin_mutex.h"
39 #include "null_mutex.h"
40 #include "spin_rw_mutex.h"
41 #include "null_rw_mutex.h"
43 #include "concurrent_vector.h"
44 #include "internal/_aggregator_impl.h"
46 // use the VC10 or gcc version of tuple if it is available.
47 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
48 #define TBB_PREVIEW_TUPLE 1
49 #include "compat/tuple"
58 \brief The graph related classes and functions
60 There are some applications that best express dependencies as messages
61 passed between nodes in a graph. These messages may contain data or
62 simply act as signals that a predecessors has completed. The graph
63 class and its associated node classes can be used to express such
70 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
71 enum concurrency { unlimited = 0, serial = 1 };
73 namespace interface6 {
75 //! The base of all graph nodes. Allows them to be stored in a collection for deletion.
78 virtual ~graph_node() {}
81 //! An empty class used for messages that mean "I'm done"
82 class continue_msg {};
84 template< typename T > class sender;
85 template< typename T > class receiver;
86 class continue_receiver;
88 //! Pure virtual template class that defines a sender of messages of type T
89 template< typename T >
92 //! The output type of this sender
93 typedef T output_type;
95 //! The successor type for this node
96 typedef receiver<T> successor_type;
100 //! Add a new successor to this node
101 virtual bool register_successor( successor_type &r ) = 0;
103 //! Removes a successor from this node
104 virtual bool remove_successor( successor_type &r ) = 0;
106 //! Request an item from the sender
107 virtual bool try_get( T & ) { return false; }
109 //! Reserves an item in the sender
110 virtual bool try_reserve( T & ) { return false; }
112 //! Releases the reserved item
113 virtual bool try_release( ) { return false; }
115 //! Consumes the reserved item
116 virtual bool try_consume( ) { return false; }
121 //! Pure virtual template class that defines a receiver of messages of type T
122 template< typename T >
126 //! The input type of this receiver
127 typedef T input_type;
129 //! The predecessor type for this node
130 typedef sender<T> predecessor_type;
133 virtual ~receiver() {}
135 //! Put an item to the receiver
136 virtual bool try_put( const T& t ) = 0;
138 //! Add a predecessor to the node
139 virtual bool register_predecessor( predecessor_type & ) { return false; }
141 //! Remove a predecessor from the node
142 virtual bool remove_predecessor( predecessor_type & ) { return false; }
146 //! Base class for receivers of completion messages
147 /** These receivers automatically reset, but cannot be explicitly waited on */
148 class continue_receiver : public receiver< continue_msg > {
152 typedef continue_msg input_type;
154 //! The predecessor type for this node
155 typedef sender< continue_msg > predecessor_type;
158 continue_receiver( int number_of_predecessors = 0 ) {
159 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
160 my_current_count = 0;
164 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
165 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
166 my_current_count = 0;
170 virtual ~continue_receiver() { }
172 //! Increments the trigger threshold
173 /* override */ bool register_predecessor( predecessor_type & ) {
174 spin_mutex::scoped_lock l(my_mutex);
175 ++my_predecessor_count;
179 //! Decrements the trigger threshold
180 /** Does not check to see if the removal of the predecessor now makes the current count
181 exceed the new threshold. So removing a predecessor while the graph is active can cause
182 unexpected results. */
183 /* override */ bool remove_predecessor( predecessor_type & ) {
184 spin_mutex::scoped_lock l(my_mutex);
185 --my_predecessor_count;
189 //! Puts a continue_msg to the receiver
190 /** If the message causes the message count to reach the predecessor count, execute() is called and
191 the message count is reset to 0. Otherwise the message count is incremented. */
192 /* override */ bool try_put( const input_type & ) {
194 spin_mutex::scoped_lock l(my_mutex);
195 if ( ++my_current_count < my_predecessor_count )
198 my_current_count = 0;
207 int my_predecessor_count;
208 int my_current_count;
209 int my_initial_predecessor_count;
211 //! Does whatever should happen when the threshold is reached
212 /** This should be very fast or else spawn a task. This is
213 called while the sender is blocked in the try_put(). */
214 virtual void execute() = 0;
218 #include "internal/_flow_graph_impl.h"
219 using namespace internal::graph_policy_namespace;
222 /** This class serves as a handle to the graph */
223 class graph : tbb::internal::no_copy {
225 template< typename Body >
226 class run_task : public task {
228 run_task( Body& body ) : my_body(body) {}
237 template< typename Receiver, typename Body >
238 class run_and_put_task : public task {
240 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
242 my_receiver.try_put( my_body() );
246 Receiver &my_receiver;
253 //! Constructs a graph withy no nodes.
254 graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
255 my_root_task->set_ref_count(1);
258 //! Destroys the graph.
259 /** Calls wait_for_all on the graph, deletes all of the nodes appended by calls to add, and then
260 destroys the root task of the graph. */
263 my_root_task->set_ref_count(0);
264 task::destroy( *my_root_task );
268 //! Used to register that an external entity may still interact with the graph.
269 /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
271 void increment_wait_count() {
273 my_root_task->increment_ref_count();
276 //! Deregisters an external entity that may have interacted with the graph.
277 /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
278 matches the number of increment_wait_count calls. */
279 void decrement_wait_count() {
281 my_root_task->decrement_ref_count();
284 //! Spawns a task that runs a body and puts its output to a specific receiver
285 /** The task is spawned as a child of the graph. This is useful for running tasks
286 that need to block a wait_for_all() on the graph. For example a one-off source. */
287 template< typename Receiver, typename Body >
288 void run( Receiver &r, Body body ) {
289 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
290 run_and_put_task< Receiver, Body >( r, body ) );
293 //! Spawns a task that runs a function object
294 /** The task is spawned as a child of the graph. This is useful for running tasks
295 that need to block a wait_for_all() on the graph. For example a one-off source. */
296 template< typename Body >
297 void run( Body body ) {
298 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
299 run_task< Body >( body ) );
302 //! Waits until the graph is idle and the number of decrement_wait_count calls equals the number of increment_wait_count calls.
303 /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
304 void wait_for_all() {
306 my_root_task->wait_for_all();
307 my_root_task->set_ref_count(1);
310 //! Returns the root task of the graph
321 #include "internal/_flow_graph_node_impl.h"
323 //! An executable node that acts as a source, i.e. it has no predecessors
324 template < typename Output >
325 class source_node : public graph_node, public sender< Output > {
328 //! The type of the output message, which is complete
329 typedef Output output_type;
331 //! The type of successors of this node
332 typedef receiver< Output > successor_type;
334 //! Constructor for a node with a successor
335 template< typename Body >
336 source_node( graph &g, Body body, bool is_active = true )
337 : my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
338 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
339 my_reserved(false), my_has_cached_item(false)
341 my_successors.set_owner(this);
345 source_node( const source_node& src ) :
346 #if ( __TBB_GCC_VERSION < 40202 )
347 graph_node(), sender<Output>(),
349 my_root_task( src.my_root_task), my_active(src.init_my_active),
350 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
351 my_reserved(false), my_has_cached_item(false)
353 my_successors.set_owner(this);
357 ~source_node() { delete my_body; }
359 //! Add a new successor to this node
360 /* override */ bool register_successor( receiver<output_type> &r ) {
361 spin_mutex::scoped_lock lock(my_mutex);
362 my_successors.register_successor(r);
368 //! Removes a successor from this node
369 /* override */ bool remove_successor( receiver<output_type> &r ) {
370 spin_mutex::scoped_lock lock(my_mutex);
371 my_successors.remove_successor(r);
375 //! Request an item from the node
376 /*override */ bool try_get( output_type &v ) {
377 spin_mutex::scoped_lock lock(my_mutex);
381 if ( my_has_cached_item ) {
383 my_has_cached_item = false;
384 } else if ( (*my_body)(v) == false ) {
390 //! Reserves an item.
391 /* override */ bool try_reserve( output_type &v ) {
392 spin_mutex::scoped_lock lock(my_mutex);
397 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
398 my_has_cached_item = true;
400 if ( my_has_cached_item ) {
409 //! Release a reserved item.
410 /** true = item has been released and so remains in sender, dest must request or reserve future items */
411 /* override */ bool try_release( ) {
412 spin_mutex::scoped_lock lock(my_mutex);
413 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
419 //! Consumes a reserved item
420 /* override */ bool try_consume( ) {
421 spin_mutex::scoped_lock lock(my_mutex);
422 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
424 my_has_cached_item = false;
425 if ( !my_successors.empty() ) {
431 //! Activates a node that was created in the inactive state
433 spin_mutex::scoped_lock lock(my_mutex);
435 if ( !my_successors.empty() )
445 internal::source_body<output_type> *my_body;
446 internal::broadcast_cache< output_type > my_successors;
448 bool my_has_cached_item;
449 output_type my_cached_item;
451 friend class internal::source_task< source_node< output_type > >;
454 /* override */ void apply_body( ) {
456 if ( try_reserve(v) == false )
459 if ( my_successors.try_put( v ) )
465 //! Spawns a task that applies the body
466 /* override */ void spawn_put( ) {
467 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
468 internal::source_task< source_node< output_type > >( *this ) );
473 //! Implements a function node that supports Input -> Output
474 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
475 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
478 typedef Input input_type;
479 typedef Output output_type;
480 typedef sender< input_type > predecessor_type;
481 typedef receiver< output_type > successor_type;
484 template< typename Body >
485 function_node( graph &g, size_t concurrency, Body body )
486 : internal::function_input<input_type,output_type,Allocator>( g, concurrency, body ) {
487 my_successors.set_owner(this);
491 function_node( const function_node& src ) :
492 #if ( __TBB_GCC_VERSION < 40202 )
495 internal::function_input<input_type,output_type,Allocator>( src )
496 #if ( __TBB_GCC_VERSION < 40202 )
497 , internal::function_output<Output>()
500 my_successors.set_owner(this);
505 internal::broadcast_cache<output_type> my_successors;
506 /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
510 //! Implements a function node that supports Input -> Output
511 template < typename Input, typename Output, typename Allocator >
512 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
515 typedef Input input_type;
516 typedef Output output_type;
517 typedef sender< input_type > predecessor_type;
518 typedef receiver< output_type > successor_type;
521 template< typename Body >
522 function_node( graph &g, size_t concurrency, Body body )
523 : internal::function_input< input_type, output_type, Allocator >( g, concurrency, body, new internal::function_input_queue< input_type, Allocator >() ) {
524 my_successors.set_owner(this);
528 function_node( const function_node& src ) :
529 #if ( __TBB_GCC_VERSION < 40202 )
532 internal::function_input<input_type,output_type,Allocator>( src, new internal::function_input_queue< input_type, Allocator >() )
533 #if ( __TBB_GCC_VERSION < 40202 )
534 , internal::function_output<Output>()
537 my_successors.set_owner(this);
542 internal::broadcast_cache<output_type> my_successors;
543 /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
547 //! Implements an executable node that supports continue_msg -> Output
548 template <typename Output>
549 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
552 typedef continue_msg input_type;
553 typedef Output output_type;
554 typedef sender< input_type > predecessor_type;
555 typedef receiver< output_type > successor_type;
557 //! Constructor for executable node with continue_msg -> Output
558 template <typename Body >
559 continue_node( graph &g, Body body )
560 : internal::continue_input<output_type>( g, body ) {
561 my_successors.set_owner(this);
564 //! Constructor for executable node with continue_msg -> Output
565 template <typename Body >
566 continue_node( graph &g, int number_of_predecessors, Body body )
567 : internal::continue_input<output_type>( g, number_of_predecessors, body )
569 my_successors.set_owner(this);
573 continue_node( const continue_node& src ) :
574 #if ( __TBB_GCC_VERSION < 40202 )
577 internal::continue_input<output_type>(src)
578 #if ( __TBB_GCC_VERSION < 40202 )
579 , internal::function_output<Output>()
582 my_successors.set_owner(this);
587 internal::broadcast_cache<output_type> my_successors;
588 /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
592 template< typename T >
593 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
596 typedef T input_type;
597 typedef T output_type;
598 typedef sender< input_type > predecessor_type;
599 typedef receiver< output_type > successor_type;
601 overwrite_node() : my_buffer_is_valid(false) {
602 my_successors.set_owner( this );
605 // Copy constructor; doesn't take anything from src; default won't work
606 overwrite_node( const overwrite_node& ) :
607 #if ( __TBB_GCC_VERSION < 40202 )
608 graph_node(), receiver<T>(), sender<T>(),
610 my_buffer_is_valid(false)
612 my_successors.set_owner( this );
617 /* override */ bool register_successor( successor_type &s ) {
618 spin_mutex::scoped_lock l( my_mutex );
619 if ( my_buffer_is_valid ) {
620 // We have a valid value that must be forwarded immediately.
621 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
622 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
623 my_successors.register_successor( s );
626 // We don't add the successor: it rejected our put and we became its predecessor instead
630 // No valid value yet, just add as successor
631 my_successors.register_successor( s );
636 /* override */ bool remove_successor( successor_type &s ) {
637 spin_mutex::scoped_lock l( my_mutex );
638 my_successors.remove_successor(s);
642 /* override */ bool try_put( const T &v ) {
643 spin_mutex::scoped_lock l( my_mutex );
645 my_buffer_is_valid = true;
646 my_successors.try_put(v);
650 /* override */ bool try_get( T &v ) {
651 spin_mutex::scoped_lock l( my_mutex );
652 if ( my_buffer_is_valid ) {
661 spin_mutex::scoped_lock l( my_mutex );
662 return my_buffer_is_valid;
666 spin_mutex::scoped_lock l( my_mutex );
667 my_buffer_is_valid = false;
673 internal::broadcast_cache< T, null_rw_mutex > my_successors;
675 bool my_buffer_is_valid;
679 template< typename T >
680 class write_once_node : public overwrite_node<T> {
683 typedef T input_type;
684 typedef T output_type;
685 typedef sender< input_type > predecessor_type;
686 typedef receiver< output_type > successor_type;
689 write_once_node() : overwrite_node<T>() {}
691 //! Copy constructor: call base class copy constructor
692 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
694 /* override */ bool try_put( const T &v ) {
695 spin_mutex::scoped_lock l( this->my_mutex );
696 if ( this->my_buffer_is_valid ) {
700 this->my_buffer_is_valid = true;
701 this->my_successors.try_put(v);
707 //! Forwards messages of type T to all successors
708 template <typename T>
709 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
711 internal::broadcast_cache<T> my_successors;
715 typedef T input_type;
716 typedef T output_type;
717 typedef sender< input_type > predecessor_type;
718 typedef receiver< output_type > successor_type;
721 my_successors.set_owner( this );
725 broadcast_node( const broadcast_node& )
726 #if ( __TBB_GCC_VERSION < 40202 )
727 : graph_node(), receiver<T>(), sender<T>()
730 my_successors.set_owner( this );
734 virtual bool register_successor( receiver<T> &r ) {
735 my_successors.register_successor( r );
739 //! Removes s as a successor
740 virtual bool remove_successor( receiver<T> &r ) {
741 my_successors.remove_successor( r );
745 /* override */ bool try_put( const T &t ) {
746 my_successors.try_put(t);
752 #include "internal/_flow_graph_item_buffer_impl.h"
754 //! Forwards messages in arbitrary order
755 template <typename T, typename A=cache_aligned_allocator<T> >
756 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
758 typedef T input_type;
759 typedef T output_type;
760 typedef sender< input_type > predecessor_type;
761 typedef receiver< output_type > successor_type;
762 typedef buffer_node<T, A> my_class;
764 typedef size_t size_type;
765 internal::round_robin_cache< T, null_rw_mutex > my_successors;
769 friend class internal::forward_task< buffer_node< T, A > >;
771 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
772 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
774 // implements the aggregator_operation concept
775 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
780 buffer_operation(const T& e, op_type t) :
781 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
782 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
786 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
787 friend class internal::aggregating_functor<my_class, buffer_operation>;
788 internal::aggregator< my_handler, buffer_operation> my_aggregator;
790 virtual void handle_operations(buffer_operation *op_list) {
791 buffer_operation *tmp;
792 bool try_forwarding=false;
795 op_list = op_list->next;
797 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
798 case rem_succ: internal_rem_succ(tmp); break;
799 case req_item: internal_pop(tmp); break;
800 case res_item: internal_reserve(tmp); break;
801 case rel_res: internal_release(tmp); try_forwarding = true; break;
802 case con_res: internal_consume(tmp); try_forwarding = true; break;
803 case put_item: internal_push(tmp); try_forwarding = true; break;
804 case try_fwd: internal_forward(tmp); break;
807 if (try_forwarding && !forwarder_busy) {
808 forwarder_busy = true;
809 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
813 //! This is executed by an enqueued task, the "forwarder"
814 virtual void forward() {
815 buffer_operation op_data(try_fwd);
817 op_data.status = WAIT;
818 my_aggregator.execute(&op_data);
819 } while (op_data.status == SUCCEEDED);
822 //! Register successor
823 virtual void internal_reg_succ(buffer_operation *op) {
824 my_successors.register_successor(*(op->r));
825 __TBB_store_with_release(op->status, SUCCEEDED);
829 virtual void internal_rem_succ(buffer_operation *op) {
830 my_successors.remove_successor(*(op->r));
831 __TBB_store_with_release(op->status, SUCCEEDED);
834 //! Tries to forward valid items to successors
835 virtual void internal_forward(buffer_operation *op) {
837 bool success = false; // flagged when a successor accepts
838 size_type counter = my_successors.size();
839 // Try forwarding, giving each successor a chance
840 while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
841 this->fetch_back(i_copy);
842 if( my_successors.try_put(i_copy) ) {
843 this->invalidate_back();
845 success = true; // found an accepting successor
849 if (success && !counter)
850 __TBB_store_with_release(op->status, SUCCEEDED);
852 __TBB_store_with_release(op->status, FAILED);
853 forwarder_busy = false;
857 virtual void internal_push(buffer_operation *op) {
858 this->push_back(*(op->elem));
859 __TBB_store_with_release(op->status, SUCCEEDED);
862 virtual void internal_pop(buffer_operation *op) {
863 if(this->pop_back(*(op->elem))) {
864 __TBB_store_with_release(op->status, SUCCEEDED);
867 __TBB_store_with_release(op->status, FAILED);
871 virtual void internal_reserve(buffer_operation *op) {
872 if(this->reserve_front(*(op->elem))) {
873 __TBB_store_with_release(op->status, SUCCEEDED);
876 __TBB_store_with_release(op->status, FAILED);
880 virtual void internal_consume(buffer_operation *op) {
881 this->consume_front();
882 __TBB_store_with_release(op->status, SUCCEEDED);
885 virtual void internal_release(buffer_operation *op) {
886 this->release_front();
887 __TBB_store_with_release(op->status, SUCCEEDED);
892 buffer_node( graph &g ) : reservable_item_buffer<T>(),
893 my_parent( g.root_task() ), forwarder_busy(false) {
894 my_successors.set_owner(this);
895 my_aggregator.initialize_handler(my_handler(this));
899 buffer_node( const buffer_node& src ) :
900 #if ( __TBB_GCC_VERSION < 40202 )
903 reservable_item_buffer<T>(),
904 #if ( __TBB_GCC_VERSION < 40202 )
905 receiver<T>(), sender<T>(),
907 my_parent( src.my_parent )
909 forwarder_busy = false;
910 my_successors.set_owner(this);
911 my_aggregator.initialize_handler(my_handler(this));
914 virtual ~buffer_node() {}
917 // message sender implementation
920 //! Adds a new successor.
921 /** Adds successor r to the list of successors; may forward tasks. */
922 /* override */ bool register_successor( receiver<output_type> &r ) {
923 buffer_operation op_data(reg_succ);
925 my_aggregator.execute(&op_data);
929 //! Removes a successor.
930 /** Removes successor r from the list of successors.
931 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
932 /* override */ bool remove_successor( receiver<output_type> &r ) {
933 r.remove_predecessor(*this);
934 buffer_operation op_data(rem_succ);
936 my_aggregator.execute(&op_data);
940 //! Request an item from the buffer_node
941 /** true = v contains the returned item<BR>
942 false = no item has been returned */
943 /* override */ bool try_get( T &v ) {
944 buffer_operation op_data(req_item);
946 my_aggregator.execute(&op_data);
947 return (op_data.status==SUCCEEDED);
950 //! Reserves an item.
951 /** false = no item can be reserved<BR>
952 true = an item is reserved */
953 /* override */ bool try_reserve( T &v ) {
954 buffer_operation op_data(res_item);
956 my_aggregator.execute(&op_data);
957 return (op_data.status==SUCCEEDED);
960 //! Release a reserved item.
961 /** true = item has been released and so remains in sender */
962 /* override */ bool try_release() {
963 buffer_operation op_data(rel_res);
964 my_aggregator.execute(&op_data);
968 //! Consumes a reserved item.
969 /** true = item is removed from sender and reservation removed */
970 /* override */ bool try_consume() {
971 buffer_operation op_data(con_res);
972 my_aggregator.execute(&op_data);
977 /** true is always returned */
978 /* override */ bool try_put(const T &t) {
979 buffer_operation op_data(t, put_item);
980 my_aggregator.execute(&op_data);
986 //! Forwards messages in FIFO order
987 template <typename T, typename A=cache_aligned_allocator<T> >
988 class queue_node : public buffer_node<T, A> {
990 typedef typename buffer_node<T, A>::size_type size_type;
991 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
993 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
995 //! Tries to forward valid items to successors
996 /* override */ void internal_forward(queue_operation *op) {
998 bool success = false; // flagged when a successor accepts
999 size_type counter = this->my_successors.size();
1000 if (this->my_reserved || !this->item_valid(this->my_head)){
1001 __TBB_store_with_release(op->status, FAILED);
1002 this->forwarder_busy = false;
1005 // Keep trying to send items while there is at least one accepting successor
1006 while (counter>0 && this->item_valid(this->my_head)) {
1007 this->fetch_front(i_copy);
1008 if(this->my_successors.try_put(i_copy)) {
1009 this->invalidate_front();
1011 success = true; // found an accepting successor
1015 if (success && !counter)
1016 __TBB_store_with_release(op->status, SUCCEEDED);
1018 __TBB_store_with_release(op->status, FAILED);
1019 this->forwarder_busy = false;
1023 /* override */ void internal_pop(queue_operation *op) {
1024 if ( this->my_reserved || !this->item_valid(this->my_head)){
1025 __TBB_store_with_release(op->status, FAILED);
1028 this->pop_front(*(op->elem));
1029 __TBB_store_with_release(op->status, SUCCEEDED);
1032 /* override */ void internal_reserve(queue_operation *op) {
1033 if (this->my_reserved || !this->item_valid(this->my_head)) {
1034 __TBB_store_with_release(op->status, FAILED);
1037 this->my_reserved = true;
1038 this->fetch_front(*(op->elem));
1039 this->invalidate_front();
1040 __TBB_store_with_release(op->status, SUCCEEDED);
1043 /* override */ void internal_consume(queue_operation *op) {
1044 this->consume_front();
1045 __TBB_store_with_release(op->status, SUCCEEDED);
1050 typedef T input_type;
1051 typedef T output_type;
1052 typedef sender< input_type > predecessor_type;
1053 typedef receiver< output_type > successor_type;
1056 queue_node( graph &g ) : buffer_node<T, A>(g) {}
1058 //! Copy constructor
1059 queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
1062 //! Forwards messages in sequence order
1063 template< typename T, typename A=cache_aligned_allocator<T> >
1064 class sequencer_node : public queue_node<T, A> {
1065 internal::function_body< T, size_t > *my_sequencer;
1068 typedef T input_type;
1069 typedef T output_type;
1070 typedef sender< input_type > predecessor_type;
1071 typedef receiver< output_type > successor_type;
1074 template< typename Sequencer >
1075 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
1076 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
1078 //! Copy constructor
1079 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
1080 my_sequencer( src.my_sequencer->clone() ) {}
1083 ~sequencer_node() { delete my_sequencer; }
1085 typedef typename buffer_node<T, A>::size_type size_type;
1086 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
1088 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1091 /* override */ void internal_push(sequencer_operation *op) {
1092 size_type tag = (*my_sequencer)(*(op->elem));
1094 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1096 if(this->size() > this->capacity())
1097 this->grow_my_array(this->size()); // tail already has 1 added to it
1098 this->item(tag) = std::make_pair( *(op->elem), true );
1099 __TBB_store_with_release(op->status, SUCCEEDED);
1103 //! Forwards messages in priority order
1104 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
1105 class priority_queue_node : public buffer_node<T, A> {
1107 typedef T input_type;
1108 typedef T output_type;
1109 typedef sender< input_type > predecessor_type;
1110 typedef receiver< output_type > successor_type;
1113 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
1115 //! Copy constructor
1116 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
1119 typedef typename buffer_node<T, A>::size_type size_type;
1120 typedef typename buffer_node<T, A>::item_type item_type;
1121 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
1123 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1125 /* override */ void handle_operations(prio_operation *op_list) {
1126 prio_operation *tmp /*, *pop_list*/ ;
1127 bool try_forwarding=false;
1130 op_list = op_list->next;
1131 switch (tmp->type) {
1132 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
1133 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
1134 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
1135 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
1136 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
1137 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
1138 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
1139 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
1142 // process pops! for now, no special pop processing
1143 if (mark<this->my_tail) heapify();
1144 if (try_forwarding && !this->forwarder_busy) {
1145 this->forwarder_busy = true;
1146 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
1150 //! Tries to forward valid items to successors
1151 /* override */ void internal_forward(prio_operation *op) {
1153 bool success = false; // flagged when a successor accepts
1154 size_type counter = this->my_successors.size();
1156 if (this->my_reserved || this->my_tail == 0) {
1157 __TBB_store_with_release(op->status, FAILED);
1158 this->forwarder_busy = false;
1161 // Keep trying to send while there exists an accepting successor
1162 while (counter>0 && this->my_tail > 0) {
1163 i_copy = this->my_array[0].first;
1164 bool msg = this->my_successors.try_put(i_copy);
1165 if ( msg == true ) {
1166 if (mark == this->my_tail) --mark;
1168 this->my_array[0].first=this->my_array[this->my_tail].first;
1169 if (this->my_tail > 1) // don't reheap for heap of size 1
1171 success = true; // found an accepting successor
1175 if (success && !counter)
1176 __TBB_store_with_release(op->status, SUCCEEDED);
1178 __TBB_store_with_release(op->status, FAILED);
1179 this->forwarder_busy = false;
1183 /* override */ void internal_push(prio_operation *op) {
1184 if ( this->my_tail >= this->my_array_size )
1185 this->grow_my_array( this->my_tail + 1 );
1186 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
1188 __TBB_store_with_release(op->status, SUCCEEDED);
1190 /* override */ void internal_pop(prio_operation *op) {
1191 if ( this->my_reserved == true || this->my_tail == 0 ) {
1192 __TBB_store_with_release(op->status, FAILED);
1195 if (mark<this->my_tail &&
1196 compare(this->my_array[0].first,
1197 this->my_array[this->my_tail-1].first)) {
1198 // there are newly pushed elems; last one higher than top
1200 *(op->elem) = this->my_array[this->my_tail-1].first;
1202 __TBB_store_with_release(op->status, SUCCEEDED);
1204 else { // extract and push the last element down heap
1205 *(op->elem) = this->my_array[0].first; // copy the data
1206 if (mark == this->my_tail) --mark;
1208 __TBB_store_with_release(op->status, SUCCEEDED);
1209 this->my_array[0].first=this->my_array[this->my_tail].first;
1210 if (this->my_tail > 1) // don't reheap for heap of size 1
1215 /* override */ void internal_reserve(prio_operation *op) {
1216 if (this->my_reserved == true || this->my_tail == 0) {
1217 __TBB_store_with_release(op->status, FAILED);
1220 this->my_reserved = true;
1221 *(op->elem) = reserved_item = this->my_array[0].first;
1222 if (mark == this->my_tail) --mark;
1224 __TBB_store_with_release(op->status, SUCCEEDED);
1225 this->my_array[0].first = this->my_array[this->my_tail].first;
1226 if (this->my_tail > 1) // don't reheap for heap of size 1
1230 /* override */ void internal_consume(prio_operation *op) {
1231 this->my_reserved = false;
1232 __TBB_store_with_release(op->status, SUCCEEDED);
1234 /* override */ void internal_release(prio_operation *op) {
1235 if (this->my_tail >= this->my_array_size)
1236 this->grow_my_array( this->my_tail + 1 );
1237 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
1239 this->my_reserved = false;
1240 __TBB_store_with_release(op->status, SUCCEEDED);
1246 input_type reserved_item;
1249 if (!mark) mark = 1;
1250 for (; mark<this->my_tail; ++mark) { // for each unheaped element
1251 size_type cur_pos = mark;
1252 input_type to_place = this->my_array[mark].first;
1253 do { // push to_place up the heap
1254 size_type parent = (cur_pos-1)>>1;
1255 if (!compare(this->my_array[parent].first, to_place))
1257 this->my_array[cur_pos].first = this->my_array[parent].first;
1260 this->my_array[cur_pos].first = to_place;
1265 size_type cur_pos=0, child=1;
1266 while (child < mark) {
1267 size_type target = child;
1269 compare(this->my_array[child].first,
1270 this->my_array[child+1].first))
1272 // target now has the higher priority child
1273 if (compare(this->my_array[target].first,
1274 this->my_array[this->my_tail].first))
1276 this->my_array[cur_pos].first = this->my_array[target].first;
1278 child = (cur_pos<<1)+1;
1280 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
1284 //! Forwards messages only if the threshold has not been reached
1285 /** This node forwards items until its threshold is reached.
1286 It contains no buffering. If the downstream node rejects, the
1287 message is dropped. */
1288 template< typename T >
1289 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1292 typedef T input_type;
1293 typedef T output_type;
1294 typedef sender< input_type > predecessor_type;
1295 typedef receiver< output_type > successor_type;
1300 size_t my_threshold;
1302 internal::predecessor_cache< T > my_predecessors;
1303 spin_mutex my_mutex;
1304 internal::broadcast_cache< T > my_successors;
1305 int init_decrement_predecessors;
1307 friend class internal::forward_task< limiter_node<T> >;
1309 // Let decrementer call decrement_counter()
1310 friend class internal::decrementer< limiter_node<T> >;
1312 void decrement_counter() {
1315 // If we can't get / put an item immediately then drop the count
1316 if ( my_predecessors.get_item( v ) == false
1317 || my_successors.try_put(v) == false ) {
1318 spin_mutex::scoped_lock lock(my_mutex);
1320 if ( !my_predecessors.empty() )
1321 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
1322 internal::forward_task< limiter_node<T> >( *this ) );
1328 spin_mutex::scoped_lock lock(my_mutex);
1329 if ( my_count < my_threshold )
1334 decrement_counter();
1339 //! The internal receiver< continue_msg > that decrements the count
1340 internal::decrementer< limiter_node<T> > decrement;
1343 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
1344 my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
1345 init_decrement_predecessors(num_decrement_predecessors),
1346 decrement(num_decrement_predecessors)
1348 my_predecessors.set_owner(this);
1349 my_successors.set_owner(this);
1350 decrement.set_owner(this);
1353 //! Copy constructor
1354 limiter_node( const limiter_node& src ) :
1355 #if ( __TBB_GCC_VERSION < 40202 )
1356 graph_node(), receiver<T>(), sender<T>(),
1358 my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
1359 init_decrement_predecessors(src.init_decrement_predecessors),
1360 decrement(src.init_decrement_predecessors)
1362 my_predecessors.set_owner(this);
1363 my_successors.set_owner(this);
1364 decrement.set_owner(this);
1367 //! Replace the current successor with this new successor
1368 /* override */ bool register_successor( receiver<output_type> &r ) {
1369 my_successors.register_successor(r);
1373 //! Removes a successor from this node
1374 /** r.remove_predecessor(*this) is also called. */
1375 /* override */ bool remove_successor( receiver<output_type> &r ) {
1376 r.remove_predecessor(*this);
1377 my_successors.remove_successor(r);
1381 //! Puts an item to this receiver
1382 /* override */ bool try_put( const T &t ) {
1384 spin_mutex::scoped_lock lock(my_mutex);
1385 if ( my_count >= my_threshold )
1391 bool msg = my_successors.try_put(t);
1393 if ( msg != true ) {
1394 spin_mutex::scoped_lock lock(my_mutex);
1396 if ( !my_predecessors.empty() )
1397 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
1398 internal::forward_task< limiter_node<T> >( *this ) );
1404 //! Removes src from the list of cached predecessors.
1405 /* override */ bool register_predecessor( predecessor_type &src ) {
1406 spin_mutex::scoped_lock lock(my_mutex);
1407 my_predecessors.add( src );
1408 if ( my_count < my_threshold && !my_successors.empty() )
1409 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
1410 internal::forward_task< limiter_node<T> >( *this ) );
1414 //! Removes src from the list of cached predecessors.
1415 /* override */ bool remove_predecessor( predecessor_type &src ) {
1416 my_predecessors.remove( src );
1422 #include "internal/_flow_graph_join_impl.h"
1424 using internal::reserving_port;
1425 using internal::queueing_port;
1426 using internal::tag_matching_port;
1427 using internal::input_port;
1428 using internal::tag_value;
1429 using internal::NO_TAG;
1431 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
1433 template<typename OutputTuple>
1434 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
1436 static const int N = std::tuple_size<OutputTuple>::value;
1437 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
1439 typedef OutputTuple output_type;
1440 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
1441 join_node(graph &g) : unfolded_type(g) { }
1442 join_node(const join_node &other) : unfolded_type(other) {}
1445 template<typename OutputTuple>
1446 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
1448 static const int N = std::tuple_size<OutputTuple>::value;
1449 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
1451 typedef OutputTuple output_type;
1452 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
1453 join_node(graph &g) : unfolded_type(g) { }
1454 join_node(const join_node &other) : unfolded_type(other) {}
1457 // template for tag_matching join_node
1458 template<typename OutputTuple>
1459 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
1460 tag_matching_port, OutputTuple, tag_matching> {
1462 static const int N = std::tuple_size<OutputTuple>::value;
1463 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
1465 typedef OutputTuple output_type;
1466 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
1467 template<typename B0, typename B1>
1468 join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
1469 template<typename B0, typename B1, typename B2>
1470 join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
1471 template<typename B0, typename B1, typename B2, typename B3>
1472 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
1473 template<typename B0, typename B1, typename B2, typename B3, typename B4>
1474 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
1475 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
1476 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
1477 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
1478 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
1479 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
1480 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
1481 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
1482 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
1483 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
1484 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
1485 join_node(const join_node &other) : unfolded_type(other) {}
1489 #include "internal/_flow_graph_or_impl.h"
1491 template<typename InputTuple>
1492 class or_node : public internal::unfolded_or_node<InputTuple> {
1494 static const int N = std::tuple_size<InputTuple>::value;
1496 typedef typename internal::or_output_type<N,InputTuple>::type output_type;
1497 typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
1498 or_node() : unfolded_type() { }
1500 or_node( const or_node& /*other*/ ) : unfolded_type() { }
1503 //! Makes an edge between a single predecessor and a single successor
1504 template< typename T >
1505 inline void make_edge( sender<T> &p, receiver<T> &s ) {
1506 p.register_successor( s );
1509 //! Makes an edge between a single predecessor and a single successor
1510 template< typename T >
1511 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
1512 p.remove_successor( s );
1515 //! Returns a copy of the body from a function or continue node
1516 template< typename Body, typename Node >
1517 Body copy_body( Node &n ) {
1518 return n.template copy_function_object<Body>();
1524 using interface6::graph;
1525 using interface6::graph_node;
1526 using interface6::continue_msg;
1527 using interface6::sender;
1528 using interface6::receiver;
1529 using interface6::continue_receiver;
1531 using interface6::source_node;
1532 using interface6::function_node;
1533 using interface6::continue_node;
1534 using interface6::overwrite_node;
1535 using interface6::write_once_node;
1536 using interface6::broadcast_node;
1537 using interface6::buffer_node;
1538 using interface6::queue_node;
1539 using interface6::sequencer_node;
1540 using interface6::priority_queue_node;
1541 using interface6::limiter_node;
1542 using namespace interface6::internal::graph_policy_namespace;
1543 using interface6::join_node;
1544 using interface6::or_node;
1545 using interface6::input_port;
1546 using interface6::copy_body;
1547 using interface6::make_edge;
1548 using interface6::remove_edge;
1549 using interface6::internal::NO_TAG;
1550 using interface6::internal::tag_value;