]> git.sesse.net Git - casparcg/blob - dependencies64/tbb/include/tbb/internal/_flow_graph_impl.h
6e7ea56db151bcd9223011425a97014d73b406a9
[casparcg] / dependencies64 / tbb / include / tbb / internal / _flow_graph_impl.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB__flow_graph_impl_H
30 #define __TBB__flow_graph_impl_H
31
32 #ifndef __TBB_flow_graph_H
33 #error Do not #include this internal file directly; use public TBB headers instead.
34 #endif
35
36 namespace internal {
37
38     namespace graph_policy_namespace {
39         enum graph_buffer_policy { rejecting, reserving, queueing, tag_matching };
40     }
41
42     //! A functor that takes no input and generates a value of type Output
43     template< typename Output >
44     class source_body : tbb::internal::no_assign {
45     public:
46         virtual ~source_body() {}
47         virtual bool operator()(Output &output) = 0;
48         virtual source_body* clone() = 0;
49     };
50     
51     //! The leaf for source_body
52     template< typename Output, typename Body>
53     class source_body_leaf : public source_body<Output> {
54     public:
55         source_body_leaf( const Body &_body ) : body(_body), init_body(_body) { }
56         /*override*/ bool operator()(Output &output) { return body( output ); }
57         /*override*/ source_body_leaf* clone() { 
58             return new source_body_leaf< Output, Body >(init_body); 
59         }
60     private:
61         Body body;
62         Body init_body;
63     };
64     
65     //! A functor that takes an Input and generates an Output
66     template< typename Input, typename Output >
67     class function_body : tbb::internal::no_assign {
68     public:
69         virtual ~function_body() {}
70         virtual Output operator()(const Input &input) = 0;
71         virtual function_body* clone() = 0;
72     };
73     
74     //! the leaf for function_body
75     template <typename Input, typename Output, typename B>
76     class function_body_leaf : public function_body< Input, Output > {
77     public:
78         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
79         Output operator()(const Input &i) { return body(i); }
80         B get_body() { return body; }
81         /*override*/ function_body_leaf* clone() {
82             return new function_body_leaf< Input, Output, B >(init_body);
83         }
84     private:
85         B body;
86         B init_body;
87     };
88     
89     //! the leaf for function_body specialized for Input and output of continue_msg
90     template <typename B>
91     class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
92     public:
93         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
94         continue_msg operator()( const continue_msg &i ) { 
95             body(i); 
96             return i; 
97         }
98         B get_body() { return body; }
99         /*override*/ function_body_leaf* clone() {
100            return new function_body_leaf< continue_msg, continue_msg, B >(init_body);
101         }    
102     private:
103         B body;
104         B init_body;
105     };
106     
107     //! the leaf for function_body specialized for Output of continue_msg
108     template <typename Input, typename B>
109     class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
110     public:
111         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
112         continue_msg operator()(const Input &i) { 
113             body(i); 
114             return continue_msg();
115         }
116         B get_body() { return body; }
117         /*override*/ function_body_leaf* clone() {
118             return new function_body_leaf< Input, continue_msg, B >(init_body);
119         }    
120     private:
121         B body;
122         B init_body;
123     };
124     
125     //! the leaf for function_body specialized for Input of continue_msg
126     template <typename Output, typename B>
127     class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
128     public:
129         function_body_leaf( const B &_body ) : body(_body), init_body(_body) { }
130         Output operator()(const continue_msg &i) { 
131             return body(i); 
132         }
133         B get_body() { return body; }
134         /*override*/ function_body_leaf* clone() {
135             return new function_body_leaf< continue_msg, Output, B >(init_body);
136         }    
137     private:
138         B body;
139         B init_body;
140     };
141
142 # if TBB_PREVIEW_GRAPH_NODES
143     //! function_body that takes an Input and a set of output ports
144     template<typename Input, typename OutputSet>
145     class multioutput_function_body {
146     public:
147         virtual ~multioutput_function_body () {}
148         virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0;
149         virtual multioutput_function_body* clone() = 0;
150     };
151
152     //! leaf for multi-output function.  OutputSet can be a std::tuple or a vector.
153     template<typename Input, typename OutputSet, typename B>
154     class multioutput_function_body_leaf : public multioutput_function_body<Input, OutputSet> {
155     public:
156         multioutput_function_body_leaf(const B &_body) : body(_body), init_body(_body) { }
157         void operator()(const Input &input, OutputSet &oset) {
158             body(input, oset); // body should explicitly put() to one or more of oset.
159         }
160         B get_body() { return body; }
161         /*override*/ multioutput_function_body_leaf* clone() {
162             return new multioutput_function_body_leaf<Input, OutputSet,B>(init_body);
163         }
164     private:
165         B body;
166         B init_body;
167     };
168 #endif  // TBB_PREVIEW_GRAPH_NODES
169     
170     //! A task that calls a node's forward function
171     template< typename NodeType >
172     class forward_task : public task {
173     
174         NodeType &my_node;
175     
176     public:
177     
178         forward_task( NodeType &n ) : my_node(n) {}
179     
180         task *execute() {
181             my_node.forward();
182             return NULL;
183         }
184     };
185     
186     //! A task that calls a node's apply_body function, passing in an input of type Input
187     template< typename NodeType, typename Input >
188     class apply_body_task : public task {
189     
190         NodeType &my_node;
191         Input my_input;
192         
193     public:
194         
195         apply_body_task( NodeType &n, const Input &i ) : my_node(n), my_input(i) {}
196         
197         task *execute() {
198             my_node.apply_body( my_input );
199             return NULL;
200         }
201     };
202     
203     //! A task that calls a node's apply_body function with no input
204     template< typename NodeType >
205     class source_task : public task {
206     
207         NodeType &my_node;
208     
209     public:
210     
211         source_task( NodeType &n ) : my_node(n) {}
212     
213         task *execute() {
214             my_node.apply_body( );
215             return NULL;
216         }
217     };
218     
219     //! An empty functor that takes an Input and returns a default constructed Output
220     template< typename Input, typename Output >
221     struct empty_body {
222        Output operator()( const Input & ) const { return Output(); } 
223     };
224     
225     //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock. 
226     template< typename T, typename M=spin_mutex >
227     class node_cache {
228         public:
229     
230         typedef size_t size_type;
231         
232         bool empty() {
233             typename my_mutex_type::scoped_lock lock( my_mutex );
234             return internal_empty();
235         }
236     
237         void add( T &n ) {
238             typename my_mutex_type::scoped_lock lock( my_mutex );
239             internal_push(n);
240         }
241     
242         void remove( T &n ) {
243             typename my_mutex_type::scoped_lock lock( my_mutex );
244             for ( size_t i = internal_size(); i != 0; --i ) {
245                 T &s = internal_pop();
246                 if ( &s != &n ) {
247                     internal_push(s);
248                 }
249             }
250         }
251         
252     protected:
253     
254         typedef M my_mutex_type;
255         my_mutex_type my_mutex;
256         std::queue< T * > my_q;
257     
258         // Assumes lock is held
259         inline bool internal_empty( )  {
260             return my_q.empty();
261         }
262     
263         // Assumes lock is held
264         inline size_type internal_size( )  {
265             return my_q.size(); 
266         }
267     
268         // Assumes lock is held
269         inline void internal_push( T &n )  {
270             my_q.push(&n);
271         }
272     
273         // Assumes lock is held
274         inline T &internal_pop() {
275             T *v = my_q.front();
276             my_q.pop();
277             return *v;
278         }
279     
280     };
281     
282     //! A cache of predecessors that only supports try_get
283     template< typename T, typename M=spin_mutex >
284     class predecessor_cache : public node_cache< sender<T>, M > {
285         public:
286         typedef M my_mutex_type;
287         typedef T output_type; 
288         typedef sender<output_type> predecessor_type;
289         typedef receiver<output_type> successor_type;
290     
291         predecessor_cache( ) : my_owner( NULL ) { }
292         
293         void set_owner( successor_type *owner ) { my_owner = owner; }
294         
295         bool get_item( output_type &v ) {
296         
297             bool msg = false;
298         
299             do {
300                 predecessor_type *src;
301                 {
302                     typename my_mutex_type::scoped_lock lock(this->my_mutex);
303                     if ( this->internal_empty() ) {
304                         break;
305                     }
306                     src = &this->internal_pop();
307                 }
308         
309                 // Try to get from this sender
310                 msg = src->try_get( v );
311         
312                 if (msg == false) {
313                     // Relinquish ownership of the edge
314                     if ( my_owner) 
315                         src->register_successor( *my_owner );
316                 } else {
317                     // Retain ownership of the edge
318                     this->add(*src);
319                 }
320             } while ( msg == false );
321             return msg;
322         }
323     
324     protected:
325         successor_type *my_owner;
326     };
327     
328     //! An cache of predecessors that supports requests and reservations
329     template< typename T, typename M=spin_mutex >
330     class reservable_predecessor_cache : public predecessor_cache< T, M > {
331     public:
332         typedef M my_mutex_type;
333         typedef T output_type; 
334         typedef sender<T> predecessor_type;
335         typedef receiver<T> successor_type;
336         
337         reservable_predecessor_cache( ) : reserved_src(NULL) { }
338         
339         bool 
340         try_reserve( output_type &v ) {
341             bool msg = false;
342         
343             do {
344                 {
345                     typename my_mutex_type::scoped_lock lock(this->my_mutex);
346                     if ( reserved_src || this->internal_empty() ) 
347                         return false;
348         
349                     reserved_src = &this->internal_pop();
350                 }
351         
352                 // Try to get from this sender
353                 msg = reserved_src->try_reserve( v );
354         
355                 if (msg == false) {
356                     typename my_mutex_type::scoped_lock lock(this->my_mutex);
357                     // Relinquish ownership of the edge
358                     reserved_src->register_successor( *this->my_owner );
359                     reserved_src = NULL;
360                 } else {
361                     // Retain ownership of the edge
362                     this->add( *reserved_src );
363                 }
364             } while ( msg == false );
365         
366             return msg;
367         }
368         
369         bool 
370         try_release( ) {
371             reserved_src->try_release( );
372             reserved_src = NULL;
373             return true;
374         }
375         
376         bool 
377         try_consume( ) {
378             reserved_src->try_consume( );
379             reserved_src = NULL;
380             return true;
381         }
382     
383     private:
384         predecessor_type *reserved_src;
385     };
386     
387     
388     //! An abstract cache of succesors
389     template<typename T, typename M=spin_rw_mutex >
390     class successor_cache : tbb::internal::no_copy {
391     protected:
392         
393         typedef M my_mutex_type;
394         my_mutex_type my_mutex;
395         
396         typedef std::list< receiver<T> * > my_successors_type;
397         my_successors_type my_successors;
398         
399         sender<T> *my_owner;
400         
401     public:
402         
403         successor_cache( ) : my_owner(NULL) {}
404         
405         void set_owner( sender<T> *owner ) { my_owner = owner; }
406         
407         virtual ~successor_cache() {}
408         
409         void register_successor( receiver<T> &r ) {
410             typename my_mutex_type::scoped_lock l(my_mutex, true);
411             my_successors.push_back( &r ); 
412         }
413     
414         void remove_successor( receiver<T> &r ) {
415             typename my_mutex_type::scoped_lock l(my_mutex, true);
416             for ( typename my_successors_type::iterator i = my_successors.begin();
417                   i != my_successors.end(); ++i ) { 
418                 if ( *i == & r ) { 
419                     my_successors.erase(i);
420                     break;
421                 }
422             }
423         }
424         
425         bool empty() { 
426             typename my_mutex_type::scoped_lock l(my_mutex, false);
427             return my_successors.empty(); 
428         }
429         
430         virtual bool try_put( const T &t ) = 0; 
431      };
432     
433     //! An abstract cache of succesors, specialized to continue_msg
434     template<>
435     class successor_cache< continue_msg > : tbb::internal::no_copy {
436     protected:
437         
438         typedef spin_rw_mutex my_mutex_type;
439         my_mutex_type my_mutex;
440         
441         typedef std::list< receiver<continue_msg> * > my_successors_type;
442         my_successors_type my_successors;
443         
444         sender<continue_msg> *my_owner;
445         
446     public:
447         
448         successor_cache( ) : my_owner(NULL) {}
449         
450         void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
451         
452         virtual ~successor_cache() {}
453         
454         void register_successor( receiver<continue_msg> &r ) {
455             my_mutex_type::scoped_lock l(my_mutex, true);
456             my_successors.push_back( &r ); 
457             if ( my_owner ) {
458                 continue_receiver *cr = dynamic_cast< continue_receiver * >(&r);
459                 if ( cr ) 
460                     cr->register_predecessor( *my_owner );
461             }
462         }
463         
464         void remove_successor( receiver<continue_msg> &r ) {
465             my_mutex_type::scoped_lock l(my_mutex, true);
466             for ( my_successors_type::iterator i = my_successors.begin();
467                   i != my_successors.end(); ++i ) { 
468                 if ( *i == & r ) { 
469                     if ( my_owner )
470                         r.remove_predecessor( *my_owner );
471                     my_successors.erase(i);
472                     break;
473                 }
474             }
475         }
476     
477         bool empty() { 
478             my_mutex_type::scoped_lock l(my_mutex, false);
479             return my_successors.empty(); 
480         }
481     
482         virtual bool try_put( const continue_msg &t ) = 0; 
483         
484      };
485     
486     //! A cache of successors that are broadcast to
487     template<typename T, typename M=spin_rw_mutex>
488     class broadcast_cache : public successor_cache<T, M> {
489         typedef M my_mutex_type;
490         typedef std::list< receiver<T> * > my_successors_type;
491         
492     public:
493         
494         broadcast_cache( ) {}
495         
496         bool try_put( const T &t ) {
497             bool msg = false;
498             bool upgraded = false;
499             typename my_mutex_type::scoped_lock l(this->my_mutex, false);
500             typename my_successors_type::iterator i = this->my_successors.begin();
501             while ( i != this->my_successors.end() ) {
502                if ( (*i)->try_put( t ) == true ) {
503                    ++i;
504                    msg = true;
505                } else {
506                   if ( (*i)->register_predecessor(*this->my_owner) ) {
507                       if (!upgraded) {
508                           l.upgrade_to_writer();
509                           upgraded = true;
510                       }
511                       i = this->my_successors.erase(i);
512                   }
513                   else {
514                       ++i;
515                   }
516                }
517             }
518             return msg;
519         }
520     };
521
522     //! A cache of successors that are put in a round-robin fashion
523     template<typename T, typename M=spin_rw_mutex >
524     class round_robin_cache : public successor_cache<T, M> {
525         typedef size_t size_type;
526         typedef M my_mutex_type;
527         typedef std::list< receiver<T> * > my_successors_type;
528     
529     public:
530         
531         round_robin_cache( ) {}
532         
533         size_type size() {
534             typename my_mutex_type::scoped_lock l(this->my_mutex, false);
535             return this->my_successors.size();
536         }
537         
538         bool try_put( const T &t ) {
539             bool upgraded = false;
540             typename my_mutex_type::scoped_lock l(this->my_mutex, false);
541             typename my_successors_type::iterator i = this->my_successors.begin();
542             while ( i != this->my_successors.end() ) {
543                if ( (*i)->try_put( t ) ) {
544                    return true;
545                } else {
546                   if ( (*i)->register_predecessor(*this->my_owner) ) {
547                       if (!upgraded) {
548                           l.upgrade_to_writer();
549                           upgraded = true;
550                       }
551                       i = this->my_successors.erase(i);
552                   }
553                   else {
554                       ++i;
555                   }
556                }
557             }
558             return false;
559         }
560     };
561     
562     template<typename T>
563     class decrementer : public continue_receiver, tbb::internal::no_copy {
564         
565         T *my_node;
566         
567         void execute() {
568             my_node->decrement_counter();
569         }
570         
571     public:
572        
573         typedef continue_msg input_type;
574         typedef continue_msg output_type;
575         decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
576         void set_owner( T *node ) { my_node = node; }
577     };
578     
579 }
580
581 #endif // __TBB__flow_graph_impl_H
582