2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB__graph_internal_H
30 #define __TBB__graph_internal_H
34 namespace graph_policy_namespace {
35 enum graph_buffer_policy { rejecting, reserving, queueing, tag_matching };
38 //! A functor that takes no input and generates a value of type Output
39 template< typename Output >
40 class source_body : tbb::internal::no_assign {
42 virtual ~source_body() {}
43 virtual bool operator()(Output &output) = 0;
44 virtual source_body* clone() = 0;
47 //! The leaf for source_body
48 template< typename Output, typename Body>
49 class source_body_leaf : public source_body<Output> {
51 source_body_leaf( const Body &_body ) : body(_body), init_body(_body) { }
52 /*override*/ bool operator()(Output &output) { return body( output ); }
53 /*override*/ source_body_leaf* clone() {
54 return new source_body_leaf< Output, Body >(init_body);
61 //! A functor that takes an Input and generates an Output
62 template< typename Input, typename Output >
63 class function_body : tbb::internal::no_assign {
65 virtual ~function_body() {}
66 virtual Output operator()(const Input &input) = 0;
67 virtual function_body* clone() = 0;
70 //! the leaf for function_body
71 template <typename Input, typename Output, typename B>
72 class function_body_leaf : public function_body< Input, Output > {
74 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
75 Output operator()(const Input &i) { return body(i); }
76 B get_body() { return body; }
77 /*override*/ function_body_leaf* clone() {
78 return new function_body_leaf< Input, Output, B >(init_body);
85 //! the leaf for function_body specialized for Input and output of continue_msg
87 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
89 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
90 continue_msg operator()( const continue_msg &i ) {
94 B get_body() { return body; }
95 /*override*/ function_body_leaf* clone() {
96 return new function_body_leaf< continue_msg, continue_msg, B >(init_body);
103 //! the leaf for function_body specialized for Output of continue_msg
104 template <typename Input, typename B>
105 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
107 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
108 continue_msg operator()(const Input &i) {
110 return continue_msg();
112 B get_body() { return body; }
113 /*override*/ function_body_leaf* clone() {
114 return new function_body_leaf< Input, continue_msg, B >(init_body);
121 //! the leaf for function_body specialized for Input of continue_msg
122 template <typename Output, typename B>
123 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
125 function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
126 Output operator()(const continue_msg &i) {
129 B get_body() { return body; }
130 /*override*/ function_body_leaf* clone() {
131 return new function_body_leaf< continue_msg, Output, B >(init_body);
138 //! A task that calls a node's forward function
139 template< typename NodeType >
140 class forward_task : public task {
146 forward_task( NodeType &n ) : my_node(n) {}
154 //! A task that calls a node's apply_body function, passing in an input of type Input
155 template< typename NodeType, typename Input >
156 class apply_body_task : public task {
163 apply_body_task( NodeType &n, const Input &i ) : my_node(n), my_input(i) {}
166 my_node.apply_body( my_input );
171 //! A task that calls a node's apply_body function with no input
172 template< typename NodeType >
173 class source_task : public task {
179 source_task( NodeType &n ) : my_node(n) {}
182 my_node.apply_body( );
187 //! An empty functor that takes an Input and returns a default constructed Output
188 template< typename Input, typename Output >
190 Output operator()( const Input & ) const { return Output(); }
193 //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
194 template< typename T, typename M=spin_mutex >
198 typedef size_t size_type;
201 typename my_mutex_type::scoped_lock lock( my_mutex );
202 return internal_empty();
206 typename my_mutex_type::scoped_lock lock( my_mutex );
210 void remove( T &n ) {
211 typename my_mutex_type::scoped_lock lock( my_mutex );
212 for ( size_t i = internal_size(); i != 0; --i ) {
213 T &s = internal_pop();
222 typedef M my_mutex_type;
223 my_mutex_type my_mutex;
224 std::queue< T * > my_q;
226 // Assumes lock is held
227 inline bool internal_empty( ) {
231 // Assumes lock is held
232 inline size_type internal_size( ) {
236 // Assumes lock is held
237 inline void internal_push( T &n ) {
241 // Assumes lock is held
242 inline T &internal_pop() {
250 //! A cache of predecessors that only supports try_get
251 template< typename T, typename M=spin_mutex >
252 class predecessor_cache : public node_cache< sender<T>, M > {
254 typedef M my_mutex_type;
255 typedef T output_type;
256 typedef sender<output_type> predecessor_type;
257 typedef receiver<output_type> successor_type;
259 predecessor_cache( ) : my_owner( NULL ) { }
261 void set_owner( successor_type *owner ) { my_owner = owner; }
263 bool get_item( output_type &v ) {
268 predecessor_type *src;
270 typename my_mutex_type::scoped_lock lock(this->my_mutex);
271 if ( this->internal_empty() ) {
274 src = &this->internal_pop();
277 // Try to get from this sender
278 msg = src->try_get( v );
281 // Relinquish ownership of the edge
283 src->register_successor( *my_owner );
285 // Retain ownership of the edge
288 } while ( msg == false );
293 successor_type *my_owner;
296 //! An cache of predecessors that supports requests and reservations
297 template< typename T, typename M=spin_mutex >
298 class reservable_predecessor_cache : public predecessor_cache< T, M > {
300 typedef M my_mutex_type;
301 typedef T output_type;
302 typedef sender<T> predecessor_type;
303 typedef receiver<T> successor_type;
305 reservable_predecessor_cache( ) : reserved_src(NULL) { }
308 try_reserve( output_type &v ) {
313 typename my_mutex_type::scoped_lock lock(this->my_mutex);
314 if ( reserved_src || this->internal_empty() )
317 reserved_src = &this->internal_pop();
320 // Try to get from this sender
321 msg = reserved_src->try_reserve( v );
324 typename my_mutex_type::scoped_lock lock(this->my_mutex);
325 // Relinquish ownership of the edge
326 reserved_src->register_successor( *this->my_owner );
329 // Retain ownership of the edge
330 this->add( *reserved_src );
332 } while ( msg == false );
339 reserved_src->try_release( );
346 reserved_src->try_consume( );
352 predecessor_type *reserved_src;
356 //! An abstract cache of succesors
357 template<typename T, typename M=spin_rw_mutex >
358 class successor_cache : tbb::internal::no_copy {
361 typedef M my_mutex_type;
362 my_mutex_type my_mutex;
364 typedef std::list< receiver<T> * > my_successors_type;
365 my_successors_type my_successors;
371 successor_cache( ) : my_owner(NULL) {}
373 void set_owner( sender<T> *owner ) { my_owner = owner; }
375 virtual ~successor_cache() {}
377 void register_successor( receiver<T> &r ) {
378 typename my_mutex_type::scoped_lock l(my_mutex, true);
379 my_successors.push_back( &r );
382 void remove_successor( receiver<T> &r ) {
383 typename my_mutex_type::scoped_lock l(my_mutex, true);
384 for ( typename my_successors_type::iterator i = my_successors.begin();
385 i != my_successors.end(); ++i ) {
387 my_successors.erase(i);
394 typename my_mutex_type::scoped_lock l(my_mutex, false);
395 return my_successors.empty();
398 virtual bool try_put( const T &t ) = 0;
401 //! An abstract cache of succesors, specialized to continue_msg
403 class successor_cache< continue_msg > : tbb::internal::no_copy {
406 typedef spin_rw_mutex my_mutex_type;
407 my_mutex_type my_mutex;
409 typedef std::list< receiver<continue_msg> * > my_successors_type;
410 my_successors_type my_successors;
412 sender<continue_msg> *my_owner;
416 successor_cache( ) : my_owner(NULL) {}
418 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
420 virtual ~successor_cache() {}
422 void register_successor( receiver<continue_msg> &r ) {
423 my_mutex_type::scoped_lock l(my_mutex, true);
424 my_successors.push_back( &r );
426 r.register_predecessor( *my_owner );
429 void remove_successor( receiver<continue_msg> &r ) {
430 my_mutex_type::scoped_lock l(my_mutex, true);
431 for ( my_successors_type::iterator i = my_successors.begin();
432 i != my_successors.end(); ++i ) {
435 r.remove_predecessor( *my_owner );
436 my_successors.erase(i);
443 my_mutex_type::scoped_lock l(my_mutex, false);
444 return my_successors.empty();
447 virtual bool try_put( const continue_msg &t ) = 0;
451 //! A cache of successors that are broadcast to
452 template<typename T, typename M=spin_rw_mutex>
453 class broadcast_cache : public successor_cache<T, M> {
454 typedef M my_mutex_type;
455 typedef std::list< receiver<T> * > my_successors_type;
459 broadcast_cache( ) {}
461 bool try_put( const T &t ) {
463 bool upgraded = false;
464 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
465 typename my_successors_type::iterator i = this->my_successors.begin();
466 while ( i != this->my_successors.end() ) {
467 if ( (*i)->try_put( t ) == true ) {
471 if ( (*i)->register_predecessor(*this->my_owner) ) {
473 l.upgrade_to_writer();
476 i = this->my_successors.erase(i);
487 //! A cache of successors that are put in a round-robin fashion
488 template<typename T, typename M=spin_rw_mutex >
489 class round_robin_cache : public successor_cache<T, M> {
490 typedef size_t size_type;
491 typedef M my_mutex_type;
492 typedef std::list< receiver<T> * > my_successors_type;
496 round_robin_cache( ) {}
499 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
500 return this->my_successors.size();
503 bool try_put( const T &t ) {
504 bool upgraded = false;
505 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
506 typename my_successors_type::iterator i = this->my_successors.begin();
507 while ( i != this->my_successors.end() ) {
508 if ( (*i)->try_put( t ) ) {
511 if ( (*i)->register_predecessor(*this->my_owner) ) {
513 l.upgrade_to_writer();
516 i = this->my_successors.erase(i);
528 class decrementer : public continue_receiver, tbb::internal::no_copy {
533 my_node->decrement_counter();
538 typedef continue_msg input_type;
539 typedef continue_msg output_type;
540 decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
541 void set_owner( T *node ) { my_node = node; }