2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB__flow_graph_impl_H
30 #define __TBB__flow_graph_impl_H
32 #ifndef __TBB_flow_graph_H
33 #error Do not #include this internal file directly; use public TBB headers instead.
38 namespace graph_policy_namespace {
39 enum graph_buffer_policy { rejecting, reserving, queueing, tag_matching };
42 //! A functor that takes no input and generates a value of type Output
43 template< typename Output >
44 class source_body : tbb::internal::no_assign {
46 virtual ~source_body() {}
47 virtual bool operator()(Output &output) = 0;
48 virtual source_body* clone() = 0;
51 //! The leaf for source_body
52 template< typename Output, typename Body>
53 class source_body_leaf : public source_body<Output> {
55 source_body_leaf( const Body &_body ) : body(_body), init_body(_body) { }
56 /*override*/ bool operator()(Output &output) { return body( output ); }
57 /*override*/ source_body_leaf* clone() {
58 return new source_body_leaf< Output, Body >(init_body);
65 //! A functor that takes an Input and generates an Output
66 template< typename Input, typename Output >
67 class function_body : tbb::internal::no_assign {
69 virtual ~function_body() {}
70 virtual Output operator()(const Input &input) = 0;
71 virtual function_body* clone() = 0;
74 //! the leaf for function_body
75 template <typename Input, typename Output, typename B>
76 class function_body_leaf : public function_body< Input, Output > {
78 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
79 Output operator()(const Input &i) { return body(i); }
80 B get_body() { return body; }
81 /*override*/ function_body_leaf* clone() {
82 return new function_body_leaf< Input, Output, B >(init_body);
89 //! the leaf for function_body specialized for Input and output of continue_msg
91 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
93 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
94 continue_msg operator()( const continue_msg &i ) {
98 B get_body() { return body; }
99 /*override*/ function_body_leaf* clone() {
100 return new function_body_leaf< continue_msg, continue_msg, B >(init_body);
107 //! the leaf for function_body specialized for Output of continue_msg
108 template <typename Input, typename B>
109 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
111 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
112 continue_msg operator()(const Input &i) {
114 return continue_msg();
116 B get_body() { return body; }
117 /*override*/ function_body_leaf* clone() {
118 return new function_body_leaf< Input, continue_msg, B >(init_body);
125 //! the leaf for function_body specialized for Input of continue_msg
126 template <typename Output, typename B>
127 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
129 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
130 Output operator()(const continue_msg &i) {
133 B get_body() { return body; }
134 /*override*/ function_body_leaf* clone() {
135 return new function_body_leaf< continue_msg, Output, B >(init_body);
142 # if TBB_PREVIEW_GRAPH_NODES
143 //! function_body that takes an Input and a set of output ports
144 template<typename Input, typename OutputSet>
145 class multioutput_function_body {
147 virtual ~multioutput_function_body () {}
148 virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0;
149 virtual multioutput_function_body* clone() = 0;
152 //! leaf for multi-output function. OutputSet can be a std::tuple or a vector.
153 template<typename Input, typename OutputSet, typename B>
154 class multioutput_function_body_leaf : public multioutput_function_body<Input, OutputSet> {
156 multioutput_function_body_leaf(const B &_body) : body(_body), init_body(_body) { }
157 void operator()(const Input &input, OutputSet &oset) {
158 body(input, oset); // body should explicitly put() to one or more of oset.
160 B get_body() { return body; }
161 /*override*/ multioutput_function_body_leaf* clone() {
162 return new multioutput_function_body_leaf<Input, OutputSet,B>(init_body);
168 #endif // TBB_PREVIEW_GRAPH_NODES
170 //! A task that calls a node's forward function
171 template< typename NodeType >
172 class forward_task : public task {
178 forward_task( NodeType &n ) : my_node(n) {}
186 //! A task that calls a node's apply_body function, passing in an input of type Input
187 template< typename NodeType, typename Input >
188 class apply_body_task : public task {
195 apply_body_task( NodeType &n, const Input &i ) : my_node(n), my_input(i) {}
198 my_node.apply_body( my_input );
203 //! A task that calls a node's apply_body function with no input
204 template< typename NodeType >
205 class source_task : public task {
211 source_task( NodeType &n ) : my_node(n) {}
214 my_node.apply_body( );
219 //! An empty functor that takes an Input and returns a default constructed Output
220 template< typename Input, typename Output >
222 Output operator()( const Input & ) const { return Output(); }
225 //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
226 template< typename T, typename M=spin_mutex >
230 typedef size_t size_type;
233 typename my_mutex_type::scoped_lock lock( my_mutex );
234 return internal_empty();
238 typename my_mutex_type::scoped_lock lock( my_mutex );
242 void remove( T &n ) {
243 typename my_mutex_type::scoped_lock lock( my_mutex );
244 for ( size_t i = internal_size(); i != 0; --i ) {
245 T &s = internal_pop();
254 typedef M my_mutex_type;
255 my_mutex_type my_mutex;
256 std::queue< T * > my_q;
258 // Assumes lock is held
259 inline bool internal_empty( ) {
263 // Assumes lock is held
264 inline size_type internal_size( ) {
268 // Assumes lock is held
269 inline void internal_push( T &n ) {
273 // Assumes lock is held
274 inline T &internal_pop() {
282 //! A cache of predecessors that only supports try_get
283 template< typename T, typename M=spin_mutex >
284 class predecessor_cache : public node_cache< sender<T>, M > {
286 typedef M my_mutex_type;
287 typedef T output_type;
288 typedef sender<output_type> predecessor_type;
289 typedef receiver<output_type> successor_type;
291 predecessor_cache( ) : my_owner( NULL ) { }
293 void set_owner( successor_type *owner ) { my_owner = owner; }
295 bool get_item( output_type &v ) {
300 predecessor_type *src;
302 typename my_mutex_type::scoped_lock lock(this->my_mutex);
303 if ( this->internal_empty() ) {
306 src = &this->internal_pop();
309 // Try to get from this sender
310 msg = src->try_get( v );
313 // Relinquish ownership of the edge
315 src->register_successor( *my_owner );
317 // Retain ownership of the edge
320 } while ( msg == false );
325 successor_type *my_owner;
328 //! An cache of predecessors that supports requests and reservations
329 template< typename T, typename M=spin_mutex >
330 class reservable_predecessor_cache : public predecessor_cache< T, M > {
332 typedef M my_mutex_type;
333 typedef T output_type;
334 typedef sender<T> predecessor_type;
335 typedef receiver<T> successor_type;
337 reservable_predecessor_cache( ) : reserved_src(NULL) { }
340 try_reserve( output_type &v ) {
345 typename my_mutex_type::scoped_lock lock(this->my_mutex);
346 if ( reserved_src || this->internal_empty() )
349 reserved_src = &this->internal_pop();
352 // Try to get from this sender
353 msg = reserved_src->try_reserve( v );
356 typename my_mutex_type::scoped_lock lock(this->my_mutex);
357 // Relinquish ownership of the edge
358 reserved_src->register_successor( *this->my_owner );
361 // Retain ownership of the edge
362 this->add( *reserved_src );
364 } while ( msg == false );
371 reserved_src->try_release( );
378 reserved_src->try_consume( );
384 predecessor_type *reserved_src;
388 //! An abstract cache of succesors
389 template<typename T, typename M=spin_rw_mutex >
390 class successor_cache : tbb::internal::no_copy {
393 typedef M my_mutex_type;
394 my_mutex_type my_mutex;
396 typedef std::list< receiver<T> * > my_successors_type;
397 my_successors_type my_successors;
403 successor_cache( ) : my_owner(NULL) {}
405 void set_owner( sender<T> *owner ) { my_owner = owner; }
407 virtual ~successor_cache() {}
409 void register_successor( receiver<T> &r ) {
410 typename my_mutex_type::scoped_lock l(my_mutex, true);
411 my_successors.push_back( &r );
414 void remove_successor( receiver<T> &r ) {
415 typename my_mutex_type::scoped_lock l(my_mutex, true);
416 for ( typename my_successors_type::iterator i = my_successors.begin();
417 i != my_successors.end(); ++i ) {
419 my_successors.erase(i);
426 typename my_mutex_type::scoped_lock l(my_mutex, false);
427 return my_successors.empty();
430 virtual bool try_put( const T &t ) = 0;
433 //! An abstract cache of succesors, specialized to continue_msg
435 class successor_cache< continue_msg > : tbb::internal::no_copy {
438 typedef spin_rw_mutex my_mutex_type;
439 my_mutex_type my_mutex;
441 typedef std::list< receiver<continue_msg> * > my_successors_type;
442 my_successors_type my_successors;
444 sender<continue_msg> *my_owner;
448 successor_cache( ) : my_owner(NULL) {}
450 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
452 virtual ~successor_cache() {}
454 void register_successor( receiver<continue_msg> &r ) {
455 my_mutex_type::scoped_lock l(my_mutex, true);
456 my_successors.push_back( &r );
458 continue_receiver *cr = dynamic_cast< continue_receiver * >(&r);
460 cr->register_predecessor( *my_owner );
464 void remove_successor( receiver<continue_msg> &r ) {
465 my_mutex_type::scoped_lock l(my_mutex, true);
466 for ( my_successors_type::iterator i = my_successors.begin();
467 i != my_successors.end(); ++i ) {
470 r.remove_predecessor( *my_owner );
471 my_successors.erase(i);
478 my_mutex_type::scoped_lock l(my_mutex, false);
479 return my_successors.empty();
482 virtual bool try_put( const continue_msg &t ) = 0;
486 //! A cache of successors that are broadcast to
487 template<typename T, typename M=spin_rw_mutex>
488 class broadcast_cache : public successor_cache<T, M> {
489 typedef M my_mutex_type;
490 typedef std::list< receiver<T> * > my_successors_type;
494 broadcast_cache( ) {}
496 bool try_put( const T &t ) {
498 bool upgraded = false;
499 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
500 typename my_successors_type::iterator i = this->my_successors.begin();
501 while ( i != this->my_successors.end() ) {
502 if ( (*i)->try_put( t ) == true ) {
506 if ( (*i)->register_predecessor(*this->my_owner) ) {
508 l.upgrade_to_writer();
511 i = this->my_successors.erase(i);
522 //! A cache of successors that are put in a round-robin fashion
523 template<typename T, typename M=spin_rw_mutex >
524 class round_robin_cache : public successor_cache<T, M> {
525 typedef size_t size_type;
526 typedef M my_mutex_type;
527 typedef std::list< receiver<T> * > my_successors_type;
531 round_robin_cache( ) {}
534 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
535 return this->my_successors.size();
538 bool try_put( const T &t ) {
539 bool upgraded = false;
540 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
541 typename my_successors_type::iterator i = this->my_successors.begin();
542 while ( i != this->my_successors.end() ) {
543 if ( (*i)->try_put( t ) ) {
546 if ( (*i)->register_predecessor(*this->my_owner) ) {
548 l.upgrade_to_writer();
551 i = this->my_successors.erase(i);
563 class decrementer : public continue_receiver, tbb::internal::no_copy {
568 my_node->decrement_counter();
573 typedef continue_msg input_type;
574 typedef continue_msg output_type;
575 decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
576 void set_owner( T *node ) { my_node = node; }
581 #endif // __TBB__flow_graph_impl_H