2 Copyright 2005-2014 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5 you can redistribute it and/or modify it under the terms of the GNU General Public License
6 version 2 as published by the Free Software Foundation. Threading Building Blocks is
7 distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9 See the GNU General Public License for more details. You should have received a copy of
10 the GNU General Public License along with Threading Building Blocks; if not, write to the
11 Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
13 As a special exception, you may use this file as part of a free software library without
14 restriction. Specifically, if other files instantiate templates or use macros or inline
15 functions from this file, or you compile this file and link it with other files to produce
16 an executable, this file does not by itself cause the resulting executable to be covered
17 by the GNU General Public License. This exception does not however invalidate any other
18 reasons why the executable file might be covered by the GNU General Public License.
21 #ifndef __TBB_pipeline_H
22 #define __TBB_pipeline_H
26 #include "tbb_allocator.h"
29 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT || __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT
30 #include <type_traits>
41 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
42 #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1)
44 typedef unsigned long Token;
45 typedef long tokendiff_t;
48 class pipeline_root_task;
49 class pipeline_cleaner;
51 } // namespace internal
53 namespace interface6 {
54 template<typename T, typename U> class filter_t;
63 //! A stage in a pipeline.
64 /** @ingroup algorithms */
65 class filter: internal::no_copy {
67 //! Value used to mark "not in pipeline"
68 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
70 //! The lowest bit 0 is for parallel vs. serial
71 static const unsigned char filter_is_serial = 0x1;
73 //! 4th bit distinguishes ordered vs unordered filters.
74 /** The bit was not set for parallel filters in TBB 2.1 and earlier,
75 but is_ordered() function always treats parallel filters as out of order. */
76 static const unsigned char filter_is_out_of_order = 0x1<<4;
78 //! 5th bit distinguishes thread-bound and regular filters.
79 static const unsigned char filter_is_bound = 0x1<<5;
81 //! 6th bit marks input filters emitting small objects
82 static const unsigned char filter_may_emit_null = 0x1<<6;
84 //! 7th bit defines exception propagation mode expected by the application.
85 static const unsigned char exact_exception_propagation =
86 #if TBB_USE_CAPTURED_EXCEPTION
90 #endif /* TBB_USE_CAPTURED_EXCEPTION */
92 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
93 static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
96 //! processes multiple items in parallel and in no particular order
97 parallel = current_version | filter_is_out_of_order,
98 //! processes items one at a time; all such filters process items in the same order
99 serial_in_order = current_version | filter_is_serial,
100 //! processes items one at a time and in no particular order
101 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
102 //! @deprecated use serial_in_order instead
103 serial = serial_in_order
106 filter( bool is_serial_ ) :
107 next_filter_in_pipeline(not_in_pipeline()),
108 my_input_buffer(NULL),
109 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
110 prev_filter_in_pipeline(not_in_pipeline()),
115 filter( mode filter_mode ) :
116 next_filter_in_pipeline(not_in_pipeline()),
117 my_input_buffer(NULL),
118 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
119 prev_filter_in_pipeline(not_in_pipeline()),
124 // signal end-of-input for concrete_filters
125 void __TBB_EXPORTED_METHOD set_end_of_input();
128 //! True if filter is serial.
129 bool is_serial() const {
130 return bool( my_filter_mode & filter_is_serial );
133 //! True if filter must receive stream in order.
134 bool is_ordered() const {
135 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
138 //! True if filter is thread-bound.
139 bool is_bound() const {
140 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
143 //! true if an input filter can emit null
144 bool object_may_be_null() {
145 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
148 //! Operate on an item from the input stream, and return item for output stream.
149 /** Returns NULL if filter is a sink. */
150 virtual void* operator()( void* item ) = 0;
153 /** If the filter was added to a pipeline, the pipeline must be destroyed first. */
154 virtual __TBB_EXPORTED_METHOD ~filter();
156 #if __TBB_TASK_GROUP_CONTEXT
157 //! Destroys item if pipeline was cancelled.
158 /** Required to prevent memory leaks.
159 Note it can be called concurrently even for serial filters.*/
160 virtual void finalize( void* /*item*/ ) {};
164 //! Pointer to next filter in the pipeline.
165 filter* next_filter_in_pipeline;
167 //! has the filter not yet processed all the tokens it will ever see?
168 // (pipeline has not yet reached end_of_input or this filter has not yet
169 // seen the last token produced by input_filter)
170 bool has_more_work();
172 //! Buffer for incoming tokens, or NULL if not required.
173 /** The buffer is required if the filter is serial or follows a thread-bound one. */
174 internal::input_buffer* my_input_buffer;
176 friend class internal::stage_task;
177 friend class internal::pipeline_root_task;
178 friend class pipeline;
179 friend class thread_bound_filter;
181 //! Storage for filter mode and dynamically checked implementation version.
182 const unsigned char my_filter_mode;
184 //! Pointer to previous filter in the pipeline.
185 filter* prev_filter_in_pipeline;
187 //! Pointer to the pipeline.
188 pipeline* my_pipeline;
190 //! Pointer to the next "segment" of filters, or NULL if not required.
191 /** In each segment, the first filter is not thread-bound but follows a thread-bound one. */
192 filter* next_segment;
195 //! A stage in a pipeline served by a user thread.
196 /** @ingroup algorithms */
197 class thread_bound_filter: public filter {
200 // item was processed
202 // item is currently not available
204 // there are no more items to process
208 thread_bound_filter(mode filter_mode):
209 filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
211 __TBB_ASSERT(filter_mode & filter::filter_is_serial, "thread-bound filters must be serial");
214 //! If a data item is available, invoke operator() on that item.
215 /** This interface is non-blocking.
216 Returns 'success' if an item was processed.
217 Returns 'item_not_available' if no item can be processed now
218 but more may arrive in the future, or if token limit is reached.
219 Returns 'end_of_stream' if there are no more items to process. */
220 result_type __TBB_EXPORTED_METHOD try_process_item();
222 //! Wait until a data item becomes available, and invoke operator() on that item.
223 /** This interface is blocking.
224 Returns 'success' if an item was processed.
225 Returns 'end_of_stream' if there are no more items to process.
226 Never returns 'item_not_available', as it blocks until another return condition applies. */
227 result_type __TBB_EXPORTED_METHOD process_item();
230 //! Internal routine for item processing
231 result_type internal_process_item(bool is_blocking);
234 //! A processing pipeline that applies filters to items.
235 /** @ingroup algorithms */
238 //! Construct empty pipeline.
239 __TBB_EXPORTED_METHOD pipeline();
241 /** Though the current implementation declares the destructor virtual, do not rely on this
242 detail. The virtualness is deprecated and may disappear in future versions of TBB. */
243 virtual __TBB_EXPORTED_METHOD ~pipeline();
245 //! Add filter to end of pipeline.
246 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
248 //! Run the pipeline to completion.
249 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
251 #if __TBB_TASK_GROUP_CONTEXT
252 //! Run the pipeline to completion with user-supplied context.
253 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
256 //! Remove all filters from the pipeline.
257 void __TBB_EXPORTED_METHOD clear();
260 friend class internal::stage_task;
261 friend class internal::pipeline_root_task;
263 friend class thread_bound_filter;
264 friend class internal::pipeline_cleaner;
265 friend class tbb::interface6::internal::pipeline_proxy;
267 //! Pointer to first filter in the pipeline.
270 //! Pointer to location where address of next filter to be added should be stored.
273 //! task who's reference count is used to determine when all stages are done.
276 //! Number of idle tokens waiting for input stage.
277 atomic<internal::Token> input_tokens;
279 //! Global counter of tokens
280 atomic<internal::Token> token_counter;
282 //! False until fetch_input returns NULL.
285 //! True if the pipeline contains a thread-bound filter; false otherwise.
286 bool has_thread_bound_filters;
288 //! Remove filter from pipeline.
289 void remove_filter( filter& filter_ );
291 //! Not used, but retained to satisfy old export files.
292 void __TBB_EXPORTED_METHOD inject_token( task& self );
294 #if __TBB_TASK_GROUP_CONTEXT
295 //! Does clean up if pipeline is cancelled or exception occurred
296 void clear_filters();
300 //------------------------------------------------------------------------
301 // Support for lambda-friendly parallel_pipeline interface
302 //------------------------------------------------------------------------
304 namespace interface6 {
307 template<typename T, typename U, typename Body> class concrete_filter;
310 //! input_filter control to signal end-of-input for parallel_pipeline
312 bool is_pipeline_stopped;
313 flow_control() { is_pipeline_stopped = false; }
314 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
316 void stop() { is_pipeline_stopped = true; }
322 template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; };
324 // Obtain type properties in one or another way
325 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT
326 template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
327 #elif __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT
328 template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; };
330 // Explicitly list the types we wish to be placed as-is in the pipeline input_buffers.
331 template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
332 template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; };
333 template<> struct tbb_trivially_copyable <short> { enum { value = true }; };
334 template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; };
335 template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; };
336 template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; };
337 template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; };
338 template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; };
339 template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; };
340 template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; };
341 #endif // Obtaining type properties
343 template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
345 template<typename T, bool> class token_helper;
347 // large object helper (uses tbb_allocator)
349 class token_helper<T, true> {
351 typedef typename tbb::tbb_allocator<T> allocator;
353 typedef T value_type;
354 static pointer create_token(const value_type & source) {
355 pointer output_t = allocator().allocate(1);
356 return new (output_t) T(source);
358 static value_type & token(pointer & t) { return *t;}
359 static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
360 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
361 static void destroy_token(pointer token) {
362 allocator().destroy(token);
363 allocator().deallocate(token,1);
367 // pointer specialization
369 class token_helper<T*, false > {
372 typedef T* value_type;
373 static pointer create_token(const value_type & source) { return source; }
374 static value_type & token(pointer & t) { return t;}
375 static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
376 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
377 static void destroy_token( pointer /*token*/) {}
380 // small object specialization (converts void* to the correct type, passes objects directly.)
382 class token_helper<T, false> {
386 } type_to_void_ptr_map;
388 typedef T pointer; // not really a pointer in this case.
389 typedef T value_type;
390 static pointer create_token(const value_type & source) {
392 static value_type & token(pointer & t) { return t;}
393 static void * cast_to_void_ptr(pointer ref) {
394 type_to_void_ptr_map mymap;
395 mymap.void_overlay = NULL;
396 mymap.actual_value = ref;
397 return mymap.void_overlay;
399 static pointer cast_from_void_ptr(void * ref) {
400 type_to_void_ptr_map mymap;
401 mymap.void_overlay = ref;
402 return mymap.actual_value;
404 static void destroy_token( pointer /*token*/) {}
407 template<typename T, typename U, typename Body>
408 class concrete_filter: public tbb::filter {
410 typedef token_helper<T,is_large_object<T>::value > t_helper;
411 typedef typename t_helper::pointer t_pointer;
412 typedef token_helper<U,is_large_object<U>::value > u_helper;
413 typedef typename u_helper::pointer u_pointer;
415 /*override*/ void* operator()(void* input) {
416 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
417 u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
418 t_helper::destroy_token(temp_input);
419 return u_helper::cast_to_void_ptr(output_u);
422 /*override*/ void finalize(void * input) {
423 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
424 t_helper::destroy_token(temp_input);
428 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
432 template<typename U, typename Body>
433 class concrete_filter<void,U,Body>: public filter {
435 typedef token_helper<U, is_large_object<U>::value > u_helper;
436 typedef typename u_helper::pointer u_pointer;
438 /*override*/void* operator()(void*) {
439 flow_control control;
440 u_pointer output_u = u_helper::create_token(my_body(control));
441 if(control.is_pipeline_stopped) {
442 u_helper::destroy_token(output_u);
446 return u_helper::cast_to_void_ptr(output_u);
450 concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
451 filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
456 template<typename T, typename Body>
457 class concrete_filter<T,void,Body>: public filter {
459 typedef token_helper<T, is_large_object<T>::value > t_helper;
460 typedef typename t_helper::pointer t_pointer;
462 /*override*/ void* operator()(void* input) {
463 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
464 my_body(t_helper::token(temp_input));
465 t_helper::destroy_token(temp_input);
468 /*override*/ void finalize(void* input) {
469 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
470 t_helper::destroy_token(temp_input);
474 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
477 template<typename Body>
478 class concrete_filter<void,void,Body>: public filter {
481 /** Override privately because it is always called virtually */
482 /*override*/ void* operator()(void*) {
483 flow_control control;
485 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
489 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
492 //! The class that represents an object of the pipeline for parallel_pipeline().
493 /** It primarily serves as RAII class that deletes heap-allocated filter instances. */
494 class pipeline_proxy {
495 tbb::pipeline my_pipe;
497 pipeline_proxy( const filter_t<void,void>& filter_chain );
499 while( filter* f = my_pipe.filter_list )
500 delete f; // filter destructor removes it from the pipeline
502 tbb::pipeline* operator->() { return &my_pipe; }
505 //! Abstract base class that represents a node in a parse tree underlying a filter_t.
506 /** These nodes are always heap-allocated and can be shared by filter_t objects. */
507 class filter_node: tbb::internal::no_copy {
508 /** Count must be atomic because it is hidden state for user, but might be shared by threads. */
509 tbb::atomic<intptr_t> ref_count;
513 #ifdef __TBB_TEST_FILTER_NODE_COUNT
514 ++(__TBB_TEST_FILTER_NODE_COUNT);
518 //! Add concrete_filter to pipeline
519 virtual void add_to( pipeline& ) = 0;
520 //! Increment reference count
521 void add_ref() {++ref_count;}
522 //! Decrement reference count and delete if it becomes zero.
524 __TBB_ASSERT(ref_count>0,"ref_count underflow");
528 virtual ~filter_node() {
529 #ifdef __TBB_TEST_FILTER_NODE_COUNT
530 --(__TBB_TEST_FILTER_NODE_COUNT);
535 //! Node in parse tree representing result of make_filter.
536 template<typename T, typename U, typename Body>
537 class filter_node_leaf: public filter_node {
538 const tbb::filter::mode mode;
540 /*override*/void add_to( pipeline& p ) {
541 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
545 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
548 //! Node in parse tree representing join of two filters.
549 class filter_node_join: public filter_node {
550 friend class filter_node; // to suppress GCC 3.2 warnings
553 /*override*/~filter_node_join() {
557 /*override*/void add_to( pipeline& p ) {
562 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
568 } // namespace internal
571 //! Create a filter to participate in parallel_pipeline
572 template<typename T, typename U, typename Body>
573 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
574 return new internal::filter_node_leaf<T,U,Body>(mode, body);
577 template<typename T, typename V, typename U>
578 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
579 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
580 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
581 return new internal::filter_node_join(*left.root,*right.root);
584 //! Class representing a chain of type-safe pipeline filters
585 template<typename T, typename U>
587 typedef internal::filter_node filter_node;
589 filter_t( filter_node* root_ ) : root(root_) {
592 friend class internal::pipeline_proxy;
593 template<typename T_, typename U_, typename Body>
594 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
595 template<typename T_, typename V_, typename U_>
596 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
598 filter_t() : root(NULL) {}
599 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
600 if( root ) root->add_ref();
602 template<typename Body>
603 filter_t( tbb::filter::mode mode, const Body& body ) :
604 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
608 void operator=( const filter_t<T,U>& rhs ) {
609 // Order of operations below carefully chosen so that reference counts remain correct
610 // in unlikely event that remove_ref throws exception.
611 filter_node* old = root;
613 if( root ) root->add_ref();
614 if( old ) old->remove_ref();
617 if( root ) root->remove_ref();
620 // Like operator= with filter_t() on right side.
622 filter_node* old = root;
629 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
630 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
631 filter_chain.root->add_to(my_pipe);
634 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
635 #if __TBB_TASK_GROUP_CONTEXT
636 , tbb::task_group_context& context
639 internal::pipeline_proxy pipe(filter_chain);
640 // tbb::pipeline::run() is called via the proxy
641 pipe->run(max_number_of_live_tokens
642 #if __TBB_TASK_GROUP_CONTEXT
648 #if __TBB_TASK_GROUP_CONTEXT
649 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
650 tbb::task_group_context context;
651 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
653 #endif // __TBB_TASK_GROUP_CONTEXT
657 using interface6::flow_control;
658 using interface6::filter_t;
659 using interface6::make_filter;
660 using interface6::parallel_pipeline;
664 #endif /* __TBB_pipeline_H */