2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB__graph_node_internal_H
30 #define __TBB__graph_node_internal_H
32 #include "_flow_graph_item_buffer_impl.h"
37 using tbb::internal::aggregated_operation;
38 using tbb::internal::aggregating_functor;
39 using tbb::internal::aggregator;
41 template< typename T, typename A >
42 class function_input_queue : public item_buffer<T,A> {
45 return this->pop_front( t );
49 return this->push_back( t );
53 //! Implements methods for a function node that takes a type T as input
54 template< typename Input, typename Output, typename A >
55 class function_input : public receiver<Input>, tbb::internal::no_assign {
56 typedef sender<Input> predecessor_type;
57 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
58 enum op_type {reg_pred, rem_pred, app_body, tryput, try_fwd};
59 typedef function_input<Input, Output, A> my_class;
63 //! The input type of this receiver
64 typedef Input input_type;
65 //! The output type of this receiver
66 typedef Output output_type;
68 //! Constructor for function_input
69 template< typename Body >
70 function_input( graph &g, size_t max_concurrency, Body& body, function_input_queue<input_type,A> *q = NULL )
71 : my_root_task(g.root_task()), my_max_concurrency(max_concurrency), my_concurrency(0),
72 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
73 my_queue(q), forwarder_busy(false) {
74 my_predecessors.set_owner(this);
75 my_aggregator.initialize_handler(my_handler(this));
79 function_input( const function_input& src, function_input_queue<input_type,A> *q = NULL ) :
80 #if (__TBB_GCC_VERSION < 40202 )
81 receiver<Input>(), tbb::internal::no_assign(),
83 my_root_task( src.my_root_task), my_max_concurrency(src.my_max_concurrency),
84 my_concurrency(0), my_body( src.my_body->clone() ), my_queue(q), forwarder_busy(false)
86 my_predecessors.set_owner(this);
87 my_aggregator.initialize_handler(my_handler(this));
91 virtual ~function_input() {
93 if ( my_queue ) delete my_queue;
97 virtual bool try_put( const input_type &t ) {
98 if ( my_max_concurrency == 0 ) {
102 my_operation op_data(t, tryput);
103 my_aggregator.execute(&op_data);
104 return op_data.status == SUCCEEDED;
108 //! Adds src to the list of cached predecessors.
109 /* override */ bool register_predecessor( predecessor_type &src ) {
110 my_operation op_data(reg_pred);
112 my_aggregator.execute(&op_data);
116 //! Removes src from the list of cached predecessors.
117 /* override */ bool remove_predecessor( predecessor_type &src ) {
118 my_operation op_data(rem_pred);
120 my_aggregator.execute(&op_data);
124 template< typename Body >
125 Body copy_function_object() {
126 internal::function_body<input_type, output_type> &body_ref = *this->my_body;
127 return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
133 const size_t my_max_concurrency;
134 size_t my_concurrency;
135 function_body<input_type, output_type> *my_body;
136 function_input_queue<input_type, A> *my_queue;
137 predecessor_cache<input_type, null_mutex > my_predecessors;
139 virtual broadcast_cache<output_type > &successors() = 0;
143 friend class apply_body_task< my_class, input_type >;
144 friend class forward_task< my_class >;
146 class my_operation : public aggregated_operation< my_operation > {
153 my_operation(const input_type& e, op_type t) :
154 type(char(t)), elem(const_cast<input_type*>(&e)) {}
155 my_operation(op_type t) : type(char(t)), r(NULL) {}
159 typedef internal::aggregating_functor<my_class, my_operation> my_handler;
160 friend class internal::aggregating_functor<my_class, my_operation>;
161 aggregator< my_handler, my_operation > my_aggregator;
163 void handle_operations(my_operation *op_list) {
167 op_list = op_list->next;
170 my_predecessors.add(*(tmp->r));
171 __TBB_store_with_release(tmp->status, SUCCEEDED);
172 if (!forwarder_busy) {
173 forwarder_busy = true;
174 spawn_forward_task();
178 my_predecessors.remove(*(tmp->r));
179 __TBB_store_with_release(tmp->status, SUCCEEDED);
182 __TBB_ASSERT(my_max_concurrency != 0, NULL);
184 __TBB_store_with_release(tmp->status, SUCCEEDED);
185 if (my_concurrency<my_max_concurrency) {
187 bool item_was_retrieved = false;
189 item_was_retrieved = my_queue->pop(i);
191 item_was_retrieved = my_predecessors.get_item(i);
192 if (item_was_retrieved) {
198 case tryput: internal_try_put(tmp); break;
199 case try_fwd: internal_forward(tmp); break;
205 void internal_try_put(my_operation *op) {
206 __TBB_ASSERT(my_max_concurrency != 0, NULL);
207 if (my_concurrency < my_max_concurrency) {
209 spawn_body_task(*(op->elem));
210 __TBB_store_with_release(op->status, SUCCEEDED);
211 } else if ( my_queue && my_queue->push(*(op->elem)) ) {
212 __TBB_store_with_release(op->status, SUCCEEDED);
214 __TBB_store_with_release(op->status, FAILED);
218 //! Tries to spawn bodies if available and if concurrency allows
219 void internal_forward(my_operation *op) {
220 if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
222 bool item_was_retrieved = false;
224 item_was_retrieved = my_queue->pop(i);
226 item_was_retrieved = my_predecessors.get_item(i);
227 if (item_was_retrieved) {
229 __TBB_store_with_release(op->status, SUCCEEDED);
234 __TBB_store_with_release(op->status, FAILED);
235 forwarder_busy = false;
238 //! Applies the body to the provided input
239 void apply_body( input_type &i ) {
240 successors().try_put( (*my_body)(i) );
241 if ( my_max_concurrency != 0 ) {
242 my_operation op_data(app_body);
243 my_aggregator.execute(&op_data);
247 //! Spawns a task that calls apply_body( input )
248 inline void spawn_body_task( const input_type &input ) {
249 task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) apply_body_task< my_class, input_type >(*this, input));
252 //! This is executed by an enqueued task, the "forwarder"
254 my_operation op_data(try_fwd);
256 op_data.status = WAIT;
257 my_aggregator.execute(&op_data);
258 } while (op_data.status == SUCCEEDED);
261 //! Spawns a task that calls forward()
262 inline void spawn_forward_task() {
263 task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) forward_task< my_class >(*this));
267 //! Implements methods for an executable node that takes continue_msg as input
268 template< typename Output >
269 class continue_input : public continue_receiver {
272 //! The input type of this receiver
273 typedef continue_msg input_type;
275 //! The output type of this receiver
276 typedef Output output_type;
278 template< typename Body >
279 continue_input( graph &g, Body& body )
280 : my_root_task(g.root_task()),
281 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
283 template< typename Body >
284 continue_input( graph &g, int number_of_predecessors, Body& body )
285 : continue_receiver( number_of_predecessors ), my_root_task(g.root_task()),
286 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
288 continue_input( const continue_input& src ) : continue_receiver(src),
289 my_root_task(src.my_root_task), my_body( src.my_body->clone() ) {}
291 template< typename Body >
292 Body copy_function_object() {
293 internal::function_body<input_type, output_type> &body_ref = *my_body;
294 return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
300 function_body<input_type, output_type> *my_body;
302 virtual broadcast_cache<output_type > &successors() = 0;
304 friend class apply_body_task< continue_input< Output >, continue_msg >;
306 //! Applies the body to the provided input
307 /* override */ void apply_body( input_type ) {
308 successors().try_put( (*my_body)( continue_msg() ) );
311 //! Spawns a task that applies the body
312 /* override */ void execute( ) {
313 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
314 apply_body_task< continue_input< Output >, continue_msg >( *this, continue_msg() ) );
319 //! Implements methods for both executable and function nodes that puts Output to its successors
320 template< typename Output >
321 class function_output : public sender<Output> {
324 typedef Output output_type;
326 function_output() { }
328 //! Adds a new successor to this node
329 /* override */ bool register_successor( receiver<output_type> &r ) {
330 successors().register_successor( r );
334 //! Removes a successor from this node
335 /* override */ bool remove_successor( receiver<output_type> &r ) {
336 successors().remove_successor( r );
342 virtual broadcast_cache<output_type > &successors() = 0;