]> git.sesse.net Git - casparcg/blob - dependencies64/tbb/include/tbb/pipeline.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / dependencies64 / tbb / include / tbb / pipeline.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_pipeline_H 
30 #define __TBB_pipeline_H 
31
32 #include "atomic.h"
33 #include "task.h"
34 #include "tbb_allocator.h"
35 #include <cstddef>
36
37 namespace tbb {
38
39 class pipeline;
40 class filter;
41
42 //! @cond INTERNAL
43 namespace internal {
44
45 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
46 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
47
48 typedef unsigned long Token;
49 typedef long tokendiff_t;
50 class stage_task;
51 class input_buffer;
52 class pipeline_root_task;
53 class pipeline_cleaner;
54
55 } // namespace internal
56
57 namespace interface6 {
58     template<typename T, typename U> class filter_t;
59
60     namespace internal {
61         class pipeline_proxy;
62     }
63 }
64
65 //! @endcond
66
67 //! A stage in a pipeline.
68 /** @ingroup algorithms */
69 class filter: internal::no_copy {
70 private:
71     //! Value used to mark "not in pipeline"
72     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
73 protected:    
74     //! The lowest bit 0 is for parallel vs. serial
75     static const unsigned char filter_is_serial = 0x1; 
76
77     //! 4th bit distinguishes ordered vs unordered filters.
78     /** The bit was not set for parallel filters in TBB 2.1 and earlier,
79         but is_ordered() function always treats parallel filters as out of order. */
80     static const unsigned char filter_is_out_of_order = 0x1<<4;  
81
82     //! 5th bit distinguishes thread-bound and regular filters.
83     static const unsigned char filter_is_bound = 0x1<<5;  
84
85     //! 6th bit marks input filters emitting small objects
86     static const unsigned char filter_may_emit_null = 0x1<<6;
87
88     //! 7th bit defines exception propagation mode expected by the application.
89     static const unsigned char exact_exception_propagation =
90 #if TBB_USE_CAPTURED_EXCEPTION
91             0x0;
92 #else
93             0x1<<7;
94 #endif /* TBB_USE_CAPTURED_EXCEPTION */
95
96     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
97     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
98 public:
99     enum mode {
100         //! processes multiple items in parallel and in no particular order
101         parallel = current_version | filter_is_out_of_order, 
102         //! processes items one at a time; all such filters process items in the same order
103         serial_in_order = current_version | filter_is_serial,
104         //! processes items one at a time and in no particular order
105         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
106         //! @deprecated use serial_in_order instead
107         serial = serial_in_order
108     };
109 protected:
110     filter( bool is_serial_ ) : 
111         next_filter_in_pipeline(not_in_pipeline()),
112         my_input_buffer(NULL),
113         my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
114         prev_filter_in_pipeline(not_in_pipeline()),
115         my_pipeline(NULL),
116         next_segment(NULL)
117     {}
118     
119     filter( mode filter_mode ) :
120         next_filter_in_pipeline(not_in_pipeline()),
121         my_input_buffer(NULL),
122         my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
123         prev_filter_in_pipeline(not_in_pipeline()),
124         my_pipeline(NULL),
125         next_segment(NULL)
126     {}
127
128     // signal end-of-input for concrete_filters
129     void __TBB_EXPORTED_METHOD set_end_of_input();
130
131 public:
132     //! True if filter is serial.
133     bool is_serial() const {
134         return bool( my_filter_mode & filter_is_serial );
135     }  
136     
137     //! True if filter must receive stream in order.
138     bool is_ordered() const {
139         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
140     }
141
142     //! True if filter is thread-bound.
143     bool is_bound() const {
144         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
145     }
146
147     //! true if an input filter can emit null
148     bool object_may_be_null() { 
149         return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
150     }
151
152     //! Operate on an item from the input stream, and return item for output stream.
153     /** Returns NULL if filter is a sink. */
154     virtual void* operator()( void* item ) = 0;
155
156     //! Destroy filter.  
157     /** If the filter was added to a pipeline, the pipeline must be destroyed first. */
158     virtual __TBB_EXPORTED_METHOD ~filter();
159
160 #if __TBB_TASK_GROUP_CONTEXT
161     //! Destroys item if pipeline was cancelled.
162     /** Required to prevent memory leaks.
163         Note it can be called concurrently even for serial filters.*/
164     virtual void finalize( void* /*item*/ ) {};
165 #endif
166
167 private:
168     //! Pointer to next filter in the pipeline.
169     filter* next_filter_in_pipeline;
170
171     //! has the filter not yet processed all the tokens it will ever see?  
172     //  (pipeline has not yet reached end_of_input or this filter has not yet
173     //  seen the last token produced by input_filter)
174     bool has_more_work();
175
176     //! Buffer for incoming tokens, or NULL if not required.
177     /** The buffer is required if the filter is serial or follows a thread-bound one. */
178     internal::input_buffer* my_input_buffer;
179
180     friend class internal::stage_task;
181     friend class internal::pipeline_root_task;
182     friend class pipeline;
183     friend class thread_bound_filter;
184
185     //! Storage for filter mode and dynamically checked implementation version.
186     const unsigned char my_filter_mode;
187
188     //! Pointer to previous filter in the pipeline.
189     filter* prev_filter_in_pipeline;
190
191     //! Pointer to the pipeline.
192     pipeline* my_pipeline;
193
194     //! Pointer to the next "segment" of filters, or NULL if not required.
195     /** In each segment, the first filter is not thread-bound but follows a thread-bound one. */
196     filter* next_segment;
197 };
198
199 //! A stage in a pipeline served by a user thread.
200 /** @ingroup algorithms */
201 class thread_bound_filter: public filter {
202 public:
203     enum result_type {
204         // item was processed
205         success,
206         // item is currently not available
207         item_not_available,
208         // there are no more items to process
209         end_of_stream
210     };
211 protected:
212     thread_bound_filter(mode filter_mode): 
213          filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
214     {}
215 public:
216     //! If a data item is available, invoke operator() on that item.  
217     /** This interface is non-blocking.
218         Returns 'success' if an item was processed.
219         Returns 'item_not_available' if no item can be processed now 
220         but more may arrive in the future, or if token limit is reached. 
221         Returns 'end_of_stream' if there are no more items to process. */
222     result_type __TBB_EXPORTED_METHOD try_process_item(); 
223
224     //! Wait until a data item becomes available, and invoke operator() on that item.
225     /** This interface is blocking.
226         Returns 'success' if an item was processed.
227         Returns 'end_of_stream' if there are no more items to process.
228         Never returns 'item_not_available', as it blocks until another return condition applies. */
229     result_type __TBB_EXPORTED_METHOD process_item();
230
231 private:
232     //! Internal routine for item processing
233     result_type internal_process_item(bool is_blocking);
234 };
235
236 //! A processing pipeline that applies filters to items.
237 /** @ingroup algorithms */
238 class pipeline {
239 public:
240     //! Construct empty pipeline.
241     __TBB_EXPORTED_METHOD pipeline();
242
243     /** Though the current implementation declares the destructor virtual, do not rely on this 
244         detail.  The virtualness is deprecated and may disappear in future versions of TBB. */
245     virtual __TBB_EXPORTED_METHOD ~pipeline();
246
247     //! Add filter to end of pipeline.
248     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
249
250     //! Run the pipeline to completion.
251     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
252
253 #if __TBB_TASK_GROUP_CONTEXT
254     //! Run the pipeline to completion with user-supplied context.
255     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
256 #endif
257
258     //! Remove all filters from the pipeline.
259     void __TBB_EXPORTED_METHOD clear();
260
261 private:
262     friend class internal::stage_task;
263     friend class internal::pipeline_root_task;
264     friend class filter;
265     friend class thread_bound_filter;
266     friend class internal::pipeline_cleaner;
267     friend class tbb::interface6::internal::pipeline_proxy;
268
269     //! Pointer to first filter in the pipeline.
270     filter* filter_list;
271
272     //! Pointer to location where address of next filter to be added should be stored.
273     filter* filter_end;
274
275     //! task who's reference count is used to determine when all stages are done.
276     task* end_counter;
277
278     //! Number of idle tokens waiting for input stage.
279     atomic<internal::Token> input_tokens;
280
281     //! Global counter of tokens 
282     atomic<internal::Token> token_counter;
283
284     //! False until fetch_input returns NULL.
285     bool end_of_input;
286
287     //! True if the pipeline contains a thread-bound filter; false otherwise.
288     bool has_thread_bound_filters;
289
290     //! Remove filter from pipeline.
291     void remove_filter( filter& filter_ );
292
293     //! Not used, but retained to satisfy old export files.
294     void __TBB_EXPORTED_METHOD inject_token( task& self );
295
296 #if __TBB_TASK_GROUP_CONTEXT
297     //! Does clean up if pipeline is cancelled or exception occured
298     void clear_filters();
299 #endif
300 };
301
302 //------------------------------------------------------------------------
303 // Support for lambda-friendly parallel_pipeline interface
304 //------------------------------------------------------------------------
305
306 namespace interface6 {
307
308 namespace internal {
309     template<typename T, typename U, typename Body> class concrete_filter;
310 }
311
312 //! input_filter control to signal end-of-input for parallel_pipeline
313 class flow_control {
314     bool is_pipeline_stopped;
315     flow_control() { is_pipeline_stopped = false; }
316     template<typename T, typename U, typename Body> friend class internal::concrete_filter;
317 public:
318     void stop() { is_pipeline_stopped = true; }
319 };
320
321 //! @cond INTERNAL
322 namespace internal {
323
324 template<typename T> struct is_large_object { enum { r = sizeof(T) > sizeof(void *) }; };
325
326 template<typename T, bool> class token_helper;
327
328 // large object helper (uses tbb_allocator)
329 template<typename T>
330 class token_helper<T, true> {
331     public:
332     typedef typename tbb::tbb_allocator<T> allocator;
333     typedef T* pointer;
334     typedef T value_type;
335     static pointer create_token(const value_type & source) {
336         pointer output_t = allocator().allocate(1);
337         return new (output_t) T(source);
338     }
339     static value_type & token(pointer & t) { return *t;}
340     static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
341     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
342     static void destroy_token(pointer token) {
343         allocator().destroy(token);
344         allocator().deallocate(token,1);
345     }
346 };
347
348 // pointer specialization
349 template<typename T>
350 class token_helper<T*, false > {
351     public:
352     typedef T* pointer;
353     typedef T* value_type;
354     static pointer create_token(const value_type & source) { return source; }
355     static value_type & token(pointer & t) { return t;}
356     static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
357     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
358     static void destroy_token( pointer /*token*/) {}
359 };
360
361 // small object specialization (converts void* to the correct type, passes objects directly.)
362 template<typename T>
363 class token_helper<T, false> {
364     typedef union {
365         T actual_value;
366         void * void_overlay;
367     } type_to_void_ptr_map;
368     public:
369     typedef T pointer;  // not really a pointer in this case.
370     typedef T value_type;
371     static pointer create_token(const value_type & source) {
372         return source; }
373     static value_type & token(pointer & t) { return t;}
374     static void * cast_to_void_ptr(pointer ref) { 
375         type_to_void_ptr_map mymap; 
376         mymap.void_overlay = NULL;
377         mymap.actual_value = ref; 
378         return mymap.void_overlay; 
379     }
380     static pointer cast_from_void_ptr(void * ref) { 
381         type_to_void_ptr_map mymap;
382         mymap.void_overlay = ref;
383         return mymap.actual_value;
384     }
385     static void destroy_token( pointer /*token*/) {}
386 };
387
388 template<typename T, typename U, typename Body>
389 class concrete_filter: public tbb::filter {
390     const Body& my_body;
391     typedef token_helper<T,is_large_object<T>::r > t_helper;
392     typedef typename t_helper::pointer t_pointer;
393     typedef token_helper<U,is_large_object<U>::r > u_helper;
394     typedef typename u_helper::pointer u_pointer;
395
396     /*override*/ void* operator()(void* input) {
397         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
398         u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
399         t_helper::destroy_token(temp_input);
400         return u_helper::cast_to_void_ptr(output_u);
401     }
402
403 public:
404     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
405 };
406
407 // input 
408 template<typename U, typename Body>
409 class concrete_filter<void,U,Body>: public filter {
410     const Body& my_body;
411     typedef token_helper<U, is_large_object<U>::r > u_helper;
412     typedef typename u_helper::pointer u_pointer;
413
414     /*override*/void* operator()(void*) {
415         flow_control control;
416         u_pointer output_u = u_helper::create_token(my_body(control));
417         if(control.is_pipeline_stopped) {
418             u_helper::destroy_token(output_u);
419             set_end_of_input();
420             return NULL;
421         }
422         return u_helper::cast_to_void_ptr(output_u);
423     }
424
425 public:
426     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : 
427         filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
428         my_body(body)
429     {}
430 };
431
432 template<typename T, typename Body>
433 class concrete_filter<T,void,Body>: public filter {
434     const Body& my_body;
435     typedef token_helper<T, is_large_object<T>::r > t_helper;
436     typedef typename t_helper::pointer t_pointer;
437    
438     /*override*/ void* operator()(void* input) {
439         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
440         my_body(t_helper::token(temp_input));
441         t_helper::destroy_token(temp_input);
442         return NULL;
443     }
444 public:
445     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
446 };
447
448 template<typename Body>
449 class concrete_filter<void,void,Body>: public filter {
450     const Body& my_body;
451     
452     /** Override privately because it is always called virtually */
453     /*override*/ void* operator()(void*) {
454         flow_control control;
455         my_body(control);
456         void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 
457         return output;
458     }
459 public:
460     concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
461 };
462
463 //! The class that represents an object of the pipeline for parallel_pipeline().
464 /** It primarily serves as RAII class that deletes heap-allocated filter instances. */
465 class pipeline_proxy {
466     tbb::pipeline my_pipe;
467 public:
468     pipeline_proxy( const filter_t<void,void>& filter_chain );
469     ~pipeline_proxy() {
470         while( filter* f = my_pipe.filter_list ) 
471             delete f; // filter destructor removes it from the pipeline
472     }
473     tbb::pipeline* operator->() { return &my_pipe; }
474 };
475
476 //! Abstract base class that represents a node in a parse tree underlying a filter_t.
477 /** These nodes are always heap-allocated and can be shared by filter_t objects. */
478 class filter_node: tbb::internal::no_copy {
479     /** Count must be atomic because it is hidden state for user, but might be shared by threads. */
480     tbb::atomic<intptr_t> ref_count;
481 protected:
482     filter_node() {
483         ref_count = 0;
484 #ifdef __TBB_TEST_FILTER_NODE_COUNT
485         ++(__TBB_TEST_FILTER_NODE_COUNT);
486 #endif
487     }
488 public:
489     //! Add concrete_filter to pipeline 
490     virtual void add_to( pipeline& ) = 0;
491     //! Increment reference count
492     void add_ref() {++ref_count;}
493     //! Decrement reference count and delete if it becomes zero.
494     void remove_ref() {
495         __TBB_ASSERT(ref_count>0,"ref_count underflow");
496         if( --ref_count==0 ) 
497             delete this;
498     }
499     virtual ~filter_node() {
500 #ifdef __TBB_TEST_FILTER_NODE_COUNT
501         --(__TBB_TEST_FILTER_NODE_COUNT);
502 #endif
503     }
504 };
505
506 //! Node in parse tree representing result of make_filter.
507 template<typename T, typename U, typename Body>
508 class filter_node_leaf: public filter_node  {
509     const tbb::filter::mode mode;
510     const Body body;
511     /*override*/void add_to( pipeline& p ) {
512         concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
513         p.add_filter( *f );
514     }
515 public:
516     filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
517 };
518
519 //! Node in parse tree representing join of two filters.
520 class filter_node_join: public filter_node {
521     friend class filter_node; // to suppress GCC 3.2 warnings
522     filter_node& left;
523     filter_node& right;
524     /*override*/~filter_node_join() {
525        left.remove_ref();
526        right.remove_ref();
527     }
528     /*override*/void add_to( pipeline& p ) {
529         left.add_to(p);
530         right.add_to(p);
531     }
532 public:
533     filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
534        left.add_ref();
535        right.add_ref();
536     }
537 };
538
539 } // namespace internal
540 //! @endcond
541
542 //! Create a filter to participate in parallel_pipeline
543 template<typename T, typename U, typename Body>
544 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
545     return new internal::filter_node_leaf<T,U,Body>(mode, body);
546 }
547
548 template<typename T, typename V, typename U>
549 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
550     __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
551     __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
552     return new internal::filter_node_join(*left.root,*right.root);
553 }
554
555 //! Class representing a chain of type-safe pipeline filters
556 template<typename T, typename U>
557 class filter_t {
558     typedef internal::filter_node filter_node;
559     filter_node* root;
560     filter_t( filter_node* root_ ) : root(root_) {
561         root->add_ref();
562     }
563     friend class internal::pipeline_proxy;
564     template<typename T_, typename U_, typename Body>
565     friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
566     template<typename T_, typename V_, typename U_>
567     friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
568 public:
569     filter_t() : root(NULL) {}
570     filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
571         if( root ) root->add_ref();
572     }
573     template<typename Body>
574     filter_t( tbb::filter::mode mode, const Body& body ) :
575         root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
576         root->add_ref();
577     }
578
579     void operator=( const filter_t<T,U>& rhs ) {
580         // Order of operations below carefully chosen so that reference counts remain correct
581         // in unlikely event that remove_ref throws exception.
582         filter_node* old = root;
583         root = rhs.root; 
584         if( root ) root->add_ref();
585         if( old ) old->remove_ref();
586     }
587     ~filter_t() {
588         if( root ) root->remove_ref();
589     }
590     void clear() {
591         // Like operator= with filter_t() on right side.
592         if( root ) {
593             filter_node* old = root;
594             root = NULL;
595             old->remove_ref();
596         }
597     }
598 };
599
600 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
601     __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
602     filter_chain.root->add_to(my_pipe);
603 }
604
605 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
606 #if __TBB_TASK_GROUP_CONTEXT
607     , tbb::task_group_context& context
608 #endif
609     ) {
610     internal::pipeline_proxy pipe(filter_chain);
611     // tbb::pipeline::run() is called via the proxy
612     pipe->run(max_number_of_live_tokens
613 #if __TBB_TASK_GROUP_CONTEXT
614               , context
615 #endif
616     );
617 }
618
619 #if __TBB_TASK_GROUP_CONTEXT
620 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
621     tbb::task_group_context context;
622     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
623 }
624 #endif // __TBB_TASK_GROUP_CONTEXT
625
626 } // interface6
627
628 using interface6::flow_control;
629 using interface6::filter_t;
630 using interface6::make_filter;
631 using interface6::parallel_pipeline;
632
633 } // tbb
634
635 #endif /* __TBB_pipeline_H */