]> git.sesse.net Git - casparcg/blob - tbb/include/tbb/internal/_flow_graph_node_impl.h
2.0. Updated tbb library.
[casparcg] / tbb / include / tbb / internal / _flow_graph_node_impl.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB__graph_node_internal_H
30 #define __TBB__graph_node_internal_H
31
32 #include "_flow_graph_item_buffer_impl.h"
33
34 //! @cond INTERNAL
35 namespace internal {
36
37     using tbb::internal::aggregated_operation;
38     using tbb::internal::aggregating_functor;
39     using tbb::internal::aggregator;
40
41      template< typename T, typename A >
42      class function_input_queue : public item_buffer<T,A> {
43      public:
44          bool pop( T& t ) {
45              return this->pop_front( t );
46          }
47
48          bool push( T& t ) {
49              return this->push_back( t );
50          }
51      };
52
53     //! Implements methods for a function node that takes a type T as input
54     template< typename Input, typename Output, typename A >
55     class function_input : public receiver<Input>, tbb::internal::no_assign {
56         typedef sender<Input> predecessor_type;
57         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
58         enum op_type {reg_pred, rem_pred, app_body, tryput, try_fwd};
59         typedef function_input<Input, Output, A> my_class;
60         
61     public:
62
63         //! The input type of this receiver
64         typedef Input input_type;
65         //! The output type of this receiver
66         typedef Output output_type;
67         
68         //! Constructor for function_input
69         template< typename Body >
70         function_input( graph &g, size_t max_concurrency, Body& body, function_input_queue<input_type,A> *q = NULL )
71             : my_root_task(g.root_task()), my_max_concurrency(max_concurrency), my_concurrency(0),
72               my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
73               my_queue(q), forwarder_busy(false) {
74             my_predecessors.set_owner(this);
75             my_aggregator.initialize_handler(my_handler(this));
76         }
77         
78         //! Copy constructor
79         function_input( const function_input& src, function_input_queue<input_type,A> *q = NULL ) : 
80 #if (__TBB_GCC_VERSION < 40202 )
81             receiver<Input>(), tbb::internal::no_assign(),
82 #endif
83             my_root_task( src.my_root_task), my_max_concurrency(src.my_max_concurrency),
84             my_concurrency(0), my_body( src.my_body->clone() ), my_queue(q), forwarder_busy(false)
85         {
86             my_predecessors.set_owner(this);
87             my_aggregator.initialize_handler(my_handler(this));
88         }
89
90         //! Destructor
91         virtual ~function_input() { 
92             delete my_body; 
93             if ( my_queue ) delete my_queue;
94         }
95         
96         //! Put to the node
97         virtual bool try_put( const input_type &t ) {
98            if ( my_max_concurrency == 0 ) {
99                spawn_body_task( t );
100                return true;
101            } else {
102                my_operation op_data(t, tryput);
103                my_aggregator.execute(&op_data);
104                return op_data.status == SUCCEEDED;
105            }
106         }
107         
108         //! Adds src to the list of cached predecessors.
109         /* override */ bool register_predecessor( predecessor_type &src ) {
110             my_operation op_data(reg_pred);
111             op_data.r = &src;
112             my_aggregator.execute(&op_data);
113             return true;
114         }
115         
116         //! Removes src from the list of cached predecessors.
117         /* override */ bool remove_predecessor( predecessor_type &src ) {
118             my_operation op_data(rem_pred);
119             op_data.r = &src;
120             my_aggregator.execute(&op_data);
121             return true;
122         }
123
124         template< typename Body >
125         Body copy_function_object() {
126             internal::function_body<input_type, output_type> &body_ref = *this->my_body;
127             return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 
128         } 
129         
130     protected:
131
132         task *my_root_task;
133         const size_t my_max_concurrency;
134         size_t my_concurrency;
135         function_body<input_type, output_type> *my_body;
136         function_input_queue<input_type, A> *my_queue;
137         predecessor_cache<input_type, null_mutex > my_predecessors;
138         
139         virtual broadcast_cache<output_type > &successors() = 0;
140
141     private:
142
143         friend class apply_body_task< my_class, input_type >;
144         friend class forward_task< my_class >;
145         
146         class my_operation : public aggregated_operation< my_operation > {
147         public:
148             char type;
149             union {
150                 input_type *elem;
151                 predecessor_type *r;
152             };
153             my_operation(const input_type& e, op_type t) :
154                 type(char(t)), elem(const_cast<input_type*>(&e)) {}
155             my_operation(op_type t) : type(char(t)), r(NULL) {}
156         };
157         
158         bool forwarder_busy;
159         typedef internal::aggregating_functor<my_class, my_operation> my_handler;
160         friend class internal::aggregating_functor<my_class, my_operation>;
161         aggregator< my_handler, my_operation > my_aggregator;
162         
163         void handle_operations(my_operation *op_list) {
164             my_operation *tmp;
165             while (op_list) {
166                 tmp = op_list;
167                 op_list = op_list->next;
168                 switch (tmp->type) {
169                 case reg_pred:
170                     my_predecessors.add(*(tmp->r));
171                     __TBB_store_with_release(tmp->status, SUCCEEDED);
172                     if (!forwarder_busy) {
173                         forwarder_busy = true;
174                         spawn_forward_task();
175                     }
176                     break;
177                 case rem_pred:
178                     my_predecessors.remove(*(tmp->r));
179                     __TBB_store_with_release(tmp->status, SUCCEEDED);
180                     break;
181                 case app_body:
182                     __TBB_ASSERT(my_max_concurrency != 0, NULL);
183                     --my_concurrency;
184                     __TBB_store_with_release(tmp->status, SUCCEEDED);
185                     if (my_concurrency<my_max_concurrency) {
186                         input_type i;
187                         bool item_was_retrieved = false;
188                         if ( my_queue )
189                             item_was_retrieved = my_queue->pop(i);
190                         else
191                             item_was_retrieved = my_predecessors.get_item(i);
192                         if (item_was_retrieved) {
193                             ++my_concurrency;
194                             spawn_body_task(i);
195                         }
196                     }
197                     break;
198                 case tryput: internal_try_put(tmp);  break;
199                 case try_fwd: internal_forward(tmp);  break;
200                 }
201             }
202         }
203         
204         //! Put to the node
205         void internal_try_put(my_operation *op) {
206             __TBB_ASSERT(my_max_concurrency != 0, NULL);
207             if (my_concurrency < my_max_concurrency) {
208                ++my_concurrency;
209                spawn_body_task(*(op->elem));
210                __TBB_store_with_release(op->status, SUCCEEDED);
211            } else if ( my_queue && my_queue->push(*(op->elem)) ) { 
212                __TBB_store_with_release(op->status, SUCCEEDED);
213            } else {
214                __TBB_store_with_release(op->status, FAILED);
215            }
216         }
217         
218         //! Tries to spawn bodies if available and if concurrency allows
219         void internal_forward(my_operation *op) {
220             if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
221                 input_type i;
222                 bool item_was_retrieved = false;
223                 if ( my_queue )
224                     item_was_retrieved = my_queue->pop(i);
225                 else
226                     item_was_retrieved = my_predecessors.get_item(i);
227                 if (item_was_retrieved) {
228                     ++my_concurrency;
229                     __TBB_store_with_release(op->status, SUCCEEDED);
230                     spawn_body_task(i);
231                     return;
232                 }
233             }
234             __TBB_store_with_release(op->status, FAILED);
235             forwarder_busy = false;
236         }
237         
238         //! Applies the body to the provided input
239         void apply_body( input_type &i ) {
240             successors().try_put( (*my_body)(i) );
241             if ( my_max_concurrency != 0 ) {
242                 my_operation op_data(app_body);
243                 my_aggregator.execute(&op_data);
244             }
245         }
246         
247        //! Spawns a task that calls apply_body( input )
248        inline void spawn_body_task( const input_type &input ) {
249            task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) apply_body_task< my_class, input_type >(*this, input));
250        }
251         
252        //! This is executed by an enqueued task, the "forwarder"
253        void forward() {
254            my_operation op_data(try_fwd);
255            do {
256                op_data.status = WAIT;
257                my_aggregator.execute(&op_data);
258            } while (op_data.status == SUCCEEDED);
259        }
260         
261        //! Spawns a task that calls forward()
262        inline void spawn_forward_task() {
263            task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) forward_task< my_class >(*this));
264        }
265     };
266         
267     //! Implements methods for an executable node that takes continue_msg as input
268     template< typename Output >
269     class continue_input : public continue_receiver {
270     public:
271         
272         //! The input type of this receiver
273         typedef continue_msg input_type;
274             
275         //! The output type of this receiver
276         typedef Output output_type;
277         
278         template< typename Body >
279         continue_input( graph &g, Body& body )
280             : my_root_task(g.root_task()), 
281              my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
282         
283         template< typename Body >
284         continue_input( graph &g, int number_of_predecessors, Body& body )
285             : continue_receiver( number_of_predecessors ), my_root_task(g.root_task()), 
286              my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
287
288         continue_input( const continue_input& src ) : continue_receiver(src), 
289             my_root_task(src.my_root_task), my_body( src.my_body->clone() ) {}
290
291         template< typename Body >
292         Body copy_function_object() {
293             internal::function_body<input_type, output_type> &body_ref = *my_body;
294             return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 
295         } 
296
297     protected:
298         
299         task *my_root_task;
300         function_body<input_type, output_type> *my_body;
301         
302         virtual broadcast_cache<output_type > &successors() = 0; 
303         
304         friend class apply_body_task< continue_input< Output >, continue_msg >;
305         
306         //! Applies the body to the provided input
307         /* override */ void apply_body( input_type ) {
308             successors().try_put( (*my_body)( continue_msg() ) );
309         }
310         
311         //! Spawns a task that applies the body
312         /* override */ void execute( ) {
313             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
314                apply_body_task< continue_input< Output >, continue_msg >( *this, continue_msg() ) ); 
315         }
316
317     };
318         
319     //! Implements methods for both executable and function nodes that puts Output to its successors
320     template< typename Output >
321     class function_output : public sender<Output> {
322     public:
323         
324         typedef Output output_type;
325         
326         function_output() { }
327         
328         //! Adds a new successor to this node
329         /* override */ bool register_successor( receiver<output_type> &r ) {
330             successors().register_successor( r );
331             return true;
332         }
333         
334         //! Removes a successor from this node
335         /* override */ bool remove_successor( receiver<output_type> &r ) {
336             successors().remove_successor( r );
337             return true;
338         }
339           
340     protected:
341         
342         virtual broadcast_cache<output_type > &successors() = 0; 
343         
344     };
345
346 }
347
348 #endif
349