2 Copyright 2005-2010 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB_pipeline_H
30 #define __TBB_pipeline_H
44 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
45 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
47 typedef unsigned long Token;
48 typedef long tokendiff_t;
51 class pipeline_root_task;
52 class pipeline_cleaner;
54 } // namespace internal
56 namespace interface5 {
57 template<typename T, typename U> class filter_t;
66 //! A stage in a pipeline.
67 /** @ingroup algorithms */
68 class filter: internal::no_copy {
70 //! Value used to mark "not in pipeline"
71 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
73 //! The lowest bit 0 is for parallel vs. serial
74 static const unsigned char filter_is_serial = 0x1;
76 //! 4th bit distinguishes ordered vs unordered filters.
77 /** The bit was not set for parallel filters in TBB 2.1 and earlier,
78 but is_ordered() function always treats parallel filters as out of order. */
79 static const unsigned char filter_is_out_of_order = 0x1<<4;
81 //! 5th bit distinguishes thread-bound and regular filters.
82 static const unsigned char filter_is_bound = 0x1<<5;
84 //! 7th bit defines exception propagation mode expected by the application.
85 static const unsigned char exact_exception_propagation =
86 #if TBB_USE_CAPTURED_EXCEPTION
90 #endif /* TBB_USE_CAPTURED_EXCEPTION */
92 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
93 static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
96 //! processes multiple items in parallel and in no particular order
97 parallel = current_version | filter_is_out_of_order,
98 //! processes items one at a time; all such filters process items in the same order
99 serial_in_order = current_version | filter_is_serial,
100 //! processes items one at a time and in no particular order
101 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
102 //! @deprecated use serial_in_order instead
103 serial = serial_in_order
106 filter( bool is_serial_ ) :
107 next_filter_in_pipeline(not_in_pipeline()),
108 my_input_buffer(NULL),
109 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
110 prev_filter_in_pipeline(not_in_pipeline()),
115 filter( mode filter_mode ) :
116 next_filter_in_pipeline(not_in_pipeline()),
117 my_input_buffer(NULL),
118 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
119 prev_filter_in_pipeline(not_in_pipeline()),
125 //! True if filter is serial.
126 bool is_serial() const {
127 return bool( my_filter_mode & filter_is_serial );
130 //! True if filter must receive stream in order.
131 bool is_ordered() const {
132 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
135 //! True if filter is thread-bound.
136 bool is_bound() const {
137 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
140 //! Operate on an item from the input stream, and return item for output stream.
141 /** Returns NULL if filter is a sink. */
142 virtual void* operator()( void* item ) = 0;
145 /** If the filter was added to a pipeline, the pipeline must be destroyed first. */
146 virtual __TBB_EXPORTED_METHOD ~filter();
148 #if __TBB_TASK_GROUP_CONTEXT
149 //! Destroys item if pipeline was cancelled.
150 /** Required to prevent memory leaks.
151 Note it can be called concurrently even for serial filters.*/
152 virtual void finalize( void* /*item*/ ) {};
156 //! Pointer to next filter in the pipeline.
157 filter* next_filter_in_pipeline;
159 //! Buffer for incoming tokens, or NULL if not required.
160 /** The buffer is required if the filter is serial or follows a thread-bound one. */
161 internal::input_buffer* my_input_buffer;
163 friend class internal::stage_task;
164 friend class internal::pipeline_root_task;
165 friend class pipeline;
166 friend class thread_bound_filter;
168 //! Storage for filter mode and dynamically checked implementation version.
169 const unsigned char my_filter_mode;
171 //! Pointer to previous filter in the pipeline.
172 filter* prev_filter_in_pipeline;
174 //! Pointer to the pipeline.
175 pipeline* my_pipeline;
177 //! Pointer to the next "segment" of filters, or NULL if not required.
178 /** In each segment, the first filter is not thread-bound but follows a thread-bound one. */
179 filter* next_segment;
182 //! A stage in a pipeline served by a user thread.
183 /** @ingroup algorithms */
184 class thread_bound_filter: public filter {
187 // item was processed
189 // item is currently not available
191 // there are no more items to process
195 thread_bound_filter(mode filter_mode):
196 filter(static_cast<mode>(filter_mode | filter::filter_is_bound | filter::exact_exception_propagation))
199 //! If a data item is available, invoke operator() on that item.
200 /** This interface is non-blocking.
201 Returns 'success' if an item was processed.
202 Returns 'item_not_available' if no item can be processed now
203 but more may arrive in the future, or if token limit is reached.
204 Returns 'end_of_stream' if there are no more items to process. */
205 result_type __TBB_EXPORTED_METHOD try_process_item();
207 //! Wait until a data item becomes available, and invoke operator() on that item.
208 /** This interface is blocking.
209 Returns 'success' if an item was processed.
210 Returns 'end_of_stream' if there are no more items to process.
211 Never returns 'item_not_available', as it blocks until another return condition applies. */
212 result_type __TBB_EXPORTED_METHOD process_item();
215 //! Internal routine for item processing
216 result_type internal_process_item(bool is_blocking);
219 //! A processing pipeling that applies filters to items.
220 /** @ingroup algorithms */
223 //! Construct empty pipeline.
224 __TBB_EXPORTED_METHOD pipeline();
226 /** Though the current implementation declares the destructor virtual, do not rely on this
227 detail. The virtualness is deprecated and may disappear in future versions of TBB. */
228 virtual __TBB_EXPORTED_METHOD ~pipeline();
230 //! Add filter to end of pipeline.
231 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
233 //! Run the pipeline to completion.
234 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
236 #if __TBB_TASK_GROUP_CONTEXT
237 //! Run the pipeline to completion with user-supplied context.
238 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
241 //! Remove all filters from the pipeline.
242 void __TBB_EXPORTED_METHOD clear();
245 friend class internal::stage_task;
246 friend class internal::pipeline_root_task;
248 friend class thread_bound_filter;
249 friend class internal::pipeline_cleaner;
250 friend class tbb::interface5::internal::pipeline_proxy;
252 //! Pointer to first filter in the pipeline.
255 //! Pointer to location where address of next filter to be added should be stored.
258 //! task who's reference count is used to determine when all stages are done.
261 //! Number of idle tokens waiting for input stage.
262 atomic<internal::Token> input_tokens;
264 //! Global counter of tokens
265 atomic<internal::Token> token_counter;
267 //! False until fetch_input returns NULL.
270 //! True if the pipeline contains a thread-bound filter; false otherwise.
271 bool has_thread_bound_filters;
273 //! Remove filter from pipeline.
274 void remove_filter( filter& filter_ );
276 //! Not used, but retained to satisfy old export files.
277 void __TBB_EXPORTED_METHOD inject_token( task& self );
279 #if __TBB_TASK_GROUP_CONTEXT
280 //! Does clean up if pipeline is cancelled or exception occured
281 void clear_filters();
285 //------------------------------------------------------------------------
286 // Support for lambda-friendly parallel_pipeline interface
287 //------------------------------------------------------------------------
289 namespace interface5 {
292 template<typename T, typename U, typename Body> class concrete_filter;
296 bool is_pipeline_stopped;
297 flow_control() { is_pipeline_stopped = false; }
298 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
300 void stop() { is_pipeline_stopped = true; }
306 template<typename T, typename U, typename Body>
307 class concrete_filter: public tbb::filter {
310 /*override*/ void* operator()(void* input) {
311 T* temp_input = (T*)input;
312 // Call user's operator()() here
313 void* output = (void*) new U(my_body(*temp_input));
319 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
322 template<typename U, typename Body>
323 class concrete_filter<void,U,Body>: public filter {
326 /*override*/void* operator()(void*) {
327 flow_control control;
328 U temp_output = my_body(control);
329 void* output = control.is_pipeline_stopped ? NULL : (void*) new U(temp_output);
333 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
336 template<typename T, typename Body>
337 class concrete_filter<T,void,Body>: public filter {
340 /*override*/ void* operator()(void* input) {
341 T* temp_input = (T*)input;
342 my_body(*temp_input);
347 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
350 template<typename Body>
351 class concrete_filter<void,void,Body>: public filter {
354 /** Override privately because it is always called virtually */
355 /*override*/ void* operator()(void*) {
356 flow_control control;
358 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
362 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
365 //! The class that represents an object of the pipeline for parallel_pipeline().
366 /** It primarily serves as RAII class that deletes heap-allocated filter instances. */
367 class pipeline_proxy {
368 tbb::pipeline my_pipe;
370 pipeline_proxy( const filter_t<void,void>& filter_chain );
372 while( filter* f = my_pipe.filter_list )
373 delete f; // filter destructor removes it from the pipeline
375 tbb::pipeline* operator->() { return &my_pipe; }
378 //! Abstract base class that represents a node in a parse tree underlying a filter_t.
379 /** These nodes are always heap-allocated and can be shared by filter_t objects. */
380 class filter_node: tbb::internal::no_copy {
381 /** Count must be atomic because it is hidden state for user, but might be shared by threads. */
382 tbb::atomic<intptr_t> ref_count;
386 #ifdef __TBB_TEST_FILTER_NODE_COUNT
387 ++(__TBB_TEST_FILTER_NODE_COUNT);
391 //! Add concrete_filter to pipeline
392 virtual void add_to( pipeline& ) = 0;
393 //! Increment reference count
394 void add_ref() {++ref_count;}
395 //! Decrement reference count and delete if it becomes zero.
397 __TBB_ASSERT(ref_count>0,"ref_count underflow");
401 virtual ~filter_node() {
402 #ifdef __TBB_TEST_FILTER_NODE_COUNT
403 --(__TBB_TEST_FILTER_NODE_COUNT);
408 //! Node in parse tree representing result of make_filter.
409 template<typename T, typename U, typename Body>
410 class filter_node_leaf: public filter_node {
411 const tbb::filter::mode mode;
413 /*override*/void add_to( pipeline& p ) {
414 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
418 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
421 //! Node in parse tree representing join of two filters.
422 class filter_node_join: public filter_node {
423 friend class filter_node; // to suppress GCC 3.2 warnings
426 /*override*/~filter_node_join() {
430 /*override*/void add_to( pipeline& p ) {
435 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
441 } // namespace internal
444 template<typename T, typename U, typename Body>
445 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
446 return new internal::filter_node_leaf<T,U,Body>(mode, body);
449 template<typename T, typename V, typename U>
450 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
451 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
452 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
453 return new internal::filter_node_join(*left.root,*right.root);
456 //! Class representing a chain of type-safe pipeline filters
457 template<typename T, typename U>
459 typedef internal::filter_node filter_node;
461 filter_t( filter_node* root_ ) : root(root_) {
464 friend class internal::pipeline_proxy;
465 template<typename T_, typename U_, typename Body>
466 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
467 template<typename T_, typename V_, typename U_>
468 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
470 filter_t() : root(NULL) {}
471 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
472 if( root ) root->add_ref();
474 template<typename Body>
475 filter_t( tbb::filter::mode mode, const Body& body ) :
476 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
480 void operator=( const filter_t<T,U>& rhs ) {
481 // Order of operations below carefully chosen so that reference counts remain correct
482 // in unlikely event that remove_ref throws exception.
483 filter_node* old = root;
485 if( root ) root->add_ref();
486 if( old ) old->remove_ref();
489 if( root ) root->remove_ref();
492 // Like operator= with filter_t() on right side.
494 filter_node* old = root;
501 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
502 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
503 filter_chain.root->add_to(my_pipe);
506 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
507 #if __TBB_TASK_GROUP_CONTEXT
508 , tbb::task_group_context& context
511 internal::pipeline_proxy pipe(filter_chain);
512 // tbb::pipeline::run() is called via the proxy
513 pipe->run(max_number_of_live_tokens
514 #if __TBB_TASK_GROUP_CONTEXT
520 #if __TBB_TASK_GROUP_CONTEXT
521 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
522 tbb::task_group_context context;
523 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
525 #endif // __TBB_TASK_GROUP_CONTEXT
529 using interface5::flow_control;
530 using interface5::filter_t;
531 using interface5::make_filter;
532 using interface5::parallel_pipeline;
536 #endif /* __TBB_pipeline_H */