]> git.sesse.net Git - casparcg/blob - tbb/include/tbb/internal/_flow_graph_impl.h
2.0. Updated tbb library.
[casparcg] / tbb / include / tbb / internal / _flow_graph_impl.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB__graph_internal_H
30 #define __TBB__graph_internal_H
31
32 namespace internal {
33
34     namespace graph_policy_namespace {
35         enum graph_buffer_policy { rejecting, reserving, queueing, tag_matching };
36     }
37
38     //! A functor that takes no input and generates a value of type Output
39     template< typename Output >
40     class source_body : tbb::internal::no_assign {
41     public:
42         virtual ~source_body() {}
43         virtual bool operator()(Output &output) = 0;
44         virtual source_body* clone() = 0;
45     };
46     
47     //! The leaf for source_body
48     template< typename Output, typename Body>
49     class source_body_leaf : public source_body<Output> {
50     public:
51         source_body_leaf( const Body &_body ) : body(_body), init_body(_body) { }
52         /*override*/ bool operator()(Output &output) { return body( output ); }
53         /*override*/ source_body_leaf* clone() { 
54             return new source_body_leaf< Output, Body >(init_body); 
55         }
56     private:
57         Body body;
58         Body init_body;
59     };
60     
61     //! A functor that takes an Input and generates an Output
62     template< typename Input, typename Output >
63     class function_body : tbb::internal::no_assign {
64     public:
65         virtual ~function_body() {}
66         virtual Output operator()(const Input &input) = 0;
67         virtual function_body* clone() = 0;
68     };
69     
70     //! the leaf for function_body
71     template <typename Input, typename Output, typename B>
72     class function_body_leaf : public function_body< Input, Output > {
73     public:
74         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
75         Output operator()(const Input &i) { return body(i); }
76         B get_body() { return body; }
77         /*override*/ function_body_leaf* clone() {
78             return new function_body_leaf< Input, Output, B >(init_body);
79         }
80     private:
81         B body;
82         B init_body;
83     };
84     
85     //! the leaf for function_body specialized for Input and output of continue_msg
86     template <typename B>
87     class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
88     public:
89         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
90         continue_msg operator()( const continue_msg &i ) { 
91             body(i); 
92             return i; 
93         }
94         B get_body() { return body; }
95         /*override*/ function_body_leaf* clone() {
96            return new function_body_leaf< continue_msg, continue_msg, B >(init_body);
97         }    
98     private:
99         B body;
100         B init_body;
101     };
102     
103     //! the leaf for function_body specialized for Output of continue_msg
104     template <typename Input, typename B>
105     class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
106     public:
107         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
108         continue_msg operator()(const Input &i) { 
109             body(i); 
110             return continue_msg();
111         }
112         B get_body() { return body; }
113         /*override*/ function_body_leaf* clone() {
114             return new function_body_leaf< Input, continue_msg, B >(init_body);
115         }    
116     private:
117         B body;
118         B init_body;
119     };
120     
121     //! the leaf for function_body specialized for Input of continue_msg
122     template <typename Output, typename B>
123     class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
124     public:
125         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
126         Output operator()(const continue_msg &i) { 
127             return body(i); 
128         }
129         B get_body() { return body; }
130         /*override*/ function_body_leaf* clone() {
131             return new function_body_leaf< continue_msg, Output, B >(init_body);
132         }    
133     private:
134         B body;
135         B init_body;
136     };
137     
138     //! A task that calls a node's forward function
139     template< typename NodeType >
140     class forward_task : public task {
141     
142         NodeType &my_node;
143     
144     public:
145     
146         forward_task( NodeType &n ) : my_node(n) {}
147     
148         task *execute() {
149             my_node.forward();
150             return NULL;
151         }
152     };
153     
154     //! A task that calls a node's apply_body function, passing in an input of type Input
155     template< typename NodeType, typename Input >
156     class apply_body_task : public task {
157     
158         NodeType &my_node;
159         Input my_input;
160         
161     public:
162         
163         apply_body_task( NodeType &n, const Input &i ) : my_node(n), my_input(i) {}
164         
165         task *execute() {
166             my_node.apply_body( my_input );
167             return NULL;
168         }
169     };
170     
171     //! A task that calls a node's apply_body function with no input
172     template< typename NodeType >
173     class source_task : public task {
174     
175         NodeType &my_node;
176     
177     public:
178     
179         source_task( NodeType &n ) : my_node(n) {}
180     
181         task *execute() {
182             my_node.apply_body( );
183             return NULL;
184         }
185     };
186     
187     //! An empty functor that takes an Input and returns a default constructed Output
188     template< typename Input, typename Output >
189     struct empty_body {
190        Output operator()( const Input & ) const { return Output(); } 
191     };
192     
193     //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock. 
194     template< typename T, typename M=spin_mutex >
195     class node_cache {
196         public:
197     
198         typedef size_t size_type;
199         
200         bool empty() {
201             typename my_mutex_type::scoped_lock lock( my_mutex );
202             return internal_empty();
203         }
204     
205         void add( T &n ) {
206             typename my_mutex_type::scoped_lock lock( my_mutex );
207             internal_push(n);
208         }
209     
210         void remove( T &n ) {
211             typename my_mutex_type::scoped_lock lock( my_mutex );
212             for ( size_t i = internal_size(); i != 0; --i ) {
213                 T &s = internal_pop();
214                 if ( &s != &n ) {
215                     internal_push(s);
216                 }
217             }
218         }
219         
220     protected:
221     
222         typedef M my_mutex_type;
223         my_mutex_type my_mutex;
224         std::queue< T * > my_q;
225     
226         // Assumes lock is held
227         inline bool internal_empty( )  {
228             return my_q.empty();
229         }
230     
231         // Assumes lock is held
232         inline size_type internal_size( )  {
233             return my_q.size(); 
234         }
235     
236         // Assumes lock is held
237         inline void internal_push( T &n )  {
238             my_q.push(&n);
239         }
240     
241         // Assumes lock is held
242         inline T &internal_pop() {
243             T *v = my_q.front();
244             my_q.pop();
245             return *v;
246         }
247     
248     };
249     
250     //! A cache of predecessors that only supports try_get
251     template< typename T, typename M=spin_mutex >
252     class predecessor_cache : public node_cache< sender<T>, M > {
253         public:
254         typedef M my_mutex_type;
255         typedef T output_type; 
256         typedef sender<output_type> predecessor_type;
257         typedef receiver<output_type> successor_type;
258     
259         predecessor_cache( ) : my_owner( NULL ) { }
260         
261         void set_owner( successor_type *owner ) { my_owner = owner; }
262         
263         bool get_item( output_type &v ) {
264         
265             bool msg = false;
266         
267             do {
268                 predecessor_type *src;
269                 {
270                     typename my_mutex_type::scoped_lock lock(this->my_mutex);
271                     if ( this->internal_empty() ) {
272                         break;
273                     }
274                     src = &this->internal_pop();
275                 }
276         
277                 // Try to get from this sender
278                 msg = src->try_get( v );
279         
280                 if (msg == false) {
281                     // Relinquish ownership of the edge
282                     if ( my_owner) 
283                         src->register_successor( *my_owner );
284                 } else {
285                     // Retain ownership of the edge
286                     this->add(*src);
287                 }
288             } while ( msg == false );
289             return msg;
290         }
291     
292     protected:
293         successor_type *my_owner;
294     };
295     
296     //! An cache of predecessors that supports requests and reservations
297     template< typename T, typename M=spin_mutex >
298     class reservable_predecessor_cache : public predecessor_cache< T, M > {
299     public:
300         typedef M my_mutex_type;
301         typedef T output_type; 
302         typedef sender<T> predecessor_type;
303         typedef receiver<T> successor_type;
304         
305         reservable_predecessor_cache( ) : reserved_src(NULL) { }
306         
307         bool 
308         try_reserve( output_type &v ) {
309             bool msg = false;
310         
311             do {
312                 {
313                     typename my_mutex_type::scoped_lock lock(this->my_mutex);
314                     if ( reserved_src || this->internal_empty() ) 
315                         return false;
316         
317                     reserved_src = &this->internal_pop();
318                 }
319         
320                 // Try to get from this sender
321                 msg = reserved_src->try_reserve( v );
322         
323                 if (msg == false) {
324                     typename my_mutex_type::scoped_lock lock(this->my_mutex);
325                     // Relinquish ownership of the edge
326                     reserved_src->register_successor( *this->my_owner );
327                     reserved_src = NULL;
328                 } else {
329                     // Retain ownership of the edge
330                     this->add( *reserved_src );
331                 }
332             } while ( msg == false );
333         
334             return msg;
335         }
336         
337         bool 
338         try_release( ) {
339             reserved_src->try_release( );
340             reserved_src = NULL;
341             return true;
342         }
343         
344         bool 
345         try_consume( ) {
346             reserved_src->try_consume( );
347             reserved_src = NULL;
348             return true;
349         }
350     
351     private:
352         predecessor_type *reserved_src;
353     };
354     
355     
356     //! An abstract cache of succesors
357     template<typename T, typename M=spin_rw_mutex >
358     class successor_cache : tbb::internal::no_copy {
359     protected:
360         
361         typedef M my_mutex_type;
362         my_mutex_type my_mutex;
363         
364         typedef std::list< receiver<T> * > my_successors_type;
365         my_successors_type my_successors;
366         
367         sender<T> *my_owner;
368         
369     public:
370         
371         successor_cache( ) : my_owner(NULL) {}
372         
373         void set_owner( sender<T> *owner ) { my_owner = owner; }
374         
375         virtual ~successor_cache() {}
376         
377         void register_successor( receiver<T> &r ) {
378             typename my_mutex_type::scoped_lock l(my_mutex, true);
379             my_successors.push_back( &r ); 
380         }
381     
382         void remove_successor( receiver<T> &r ) {
383             typename my_mutex_type::scoped_lock l(my_mutex, true);
384             for ( typename my_successors_type::iterator i = my_successors.begin();
385                   i != my_successors.end(); ++i ) { 
386                 if ( *i == & r ) { 
387                     my_successors.erase(i);
388                     break;
389                 }
390             }
391         }
392         
393         bool empty() { 
394             typename my_mutex_type::scoped_lock l(my_mutex, false);
395             return my_successors.empty(); 
396         }
397         
398         virtual bool try_put( const T &t ) = 0; 
399      };
400     
401     //! An abstract cache of succesors, specialized to continue_msg
402     template<>
403     class successor_cache< continue_msg > : tbb::internal::no_copy {
404     protected:
405         
406         typedef spin_rw_mutex my_mutex_type;
407         my_mutex_type my_mutex;
408         
409         typedef std::list< receiver<continue_msg> * > my_successors_type;
410         my_successors_type my_successors;
411         
412         sender<continue_msg> *my_owner;
413         
414     public:
415         
416         successor_cache( ) : my_owner(NULL) {}
417         
418         void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
419         
420         virtual ~successor_cache() {}
421         
422         void register_successor( receiver<continue_msg> &r ) {
423             my_mutex_type::scoped_lock l(my_mutex, true);
424             my_successors.push_back( &r ); 
425             if ( my_owner )
426                 r.register_predecessor( *my_owner );
427         }
428         
429         void remove_successor( receiver<continue_msg> &r ) {
430             my_mutex_type::scoped_lock l(my_mutex, true);
431             for ( my_successors_type::iterator i = my_successors.begin();
432                   i != my_successors.end(); ++i ) { 
433                 if ( *i == & r ) { 
434                     if ( my_owner )
435                         r.remove_predecessor( *my_owner );
436                     my_successors.erase(i);
437                     break;
438                 }
439             }
440         }
441     
442         bool empty() { 
443             my_mutex_type::scoped_lock l(my_mutex, false);
444             return my_successors.empty(); 
445         }
446     
447         virtual bool try_put( const continue_msg &t ) = 0; 
448         
449      };
450     
451     //! A cache of successors that are broadcast to
452     template<typename T, typename M=spin_rw_mutex>
453     class broadcast_cache : public successor_cache<T, M> {
454         typedef M my_mutex_type;
455         typedef std::list< receiver<T> * > my_successors_type;
456         
457     public:
458         
459         broadcast_cache( ) {}
460         
461         bool try_put( const T &t ) {
462             bool msg = false;
463             bool upgraded = false;
464             typename my_mutex_type::scoped_lock l(this->my_mutex, false);
465             typename my_successors_type::iterator i = this->my_successors.begin();
466             while ( i != this->my_successors.end() ) {
467                if ( (*i)->try_put( t ) == true ) {
468                    ++i;
469                    msg = true;
470                } else {
471                   if ( (*i)->register_predecessor(*this->my_owner) ) {
472                       if (!upgraded) {
473                           l.upgrade_to_writer();
474                           upgraded = true;
475                       }
476                       i = this->my_successors.erase(i);
477                   }
478                   else {
479                       ++i;
480                   }
481                }
482             }
483             return msg;
484         }
485     };
486     
487     //! A cache of successors that are put in a round-robin fashion
488     template<typename T, typename M=spin_rw_mutex >
489     class round_robin_cache : public successor_cache<T, M> {
490         typedef size_t size_type;
491         typedef M my_mutex_type;
492         typedef std::list< receiver<T> * > my_successors_type;
493     
494     public:
495         
496         round_robin_cache( ) {}
497         
498         size_type size() {
499             typename my_mutex_type::scoped_lock l(this->my_mutex, false);
500             return this->my_successors.size();
501         }
502         
503         bool try_put( const T &t ) {
504             bool upgraded = false;
505             typename my_mutex_type::scoped_lock l(this->my_mutex, false);
506             typename my_successors_type::iterator i = this->my_successors.begin();
507             while ( i != this->my_successors.end() ) {
508                if ( (*i)->try_put( t ) ) {
509                    return true;
510                } else {
511                   if ( (*i)->register_predecessor(*this->my_owner) ) {
512                       if (!upgraded) {
513                           l.upgrade_to_writer();
514                           upgraded = true;
515                       }
516                       i = this->my_successors.erase(i);
517                   }
518                   else {
519                       ++i;
520                   }
521                }
522             }
523             return false;
524         }
525     };
526     
527     template<typename T>
528     class decrementer : public continue_receiver, tbb::internal::no_copy {
529         
530         T *my_node;
531         
532         void execute() {
533             my_node->decrement_counter();
534         }
535         
536     public:
537        
538         typedef continue_msg input_type;
539         typedef continue_msg output_type;
540         decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
541         void set_owner( T *node ) { my_node = node; }
542     };
543     
544 }
545
546 #endif
547