]> git.sesse.net Git - casparcg/blob - tbb30_20100406oss/include/tbb/parallel_while.h
21c2bc185bbfcf9a54b5995b9d840970975fa8a8
[casparcg] / tbb30_20100406oss / include / tbb / parallel_while.h
1 /*
2     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_parallel_while
30 #define __TBB_parallel_while
31
32 #include "task.h"
33 #include <new>
34
35 namespace tbb {
36
37 template<typename Body>
38 class parallel_while;
39
40 //! @cond INTERNAL
41 namespace internal {
42
43     template<typename Stream, typename Body> class while_task;
44
45     //! For internal use only.
46     /** Executes one iteration of a while.
47         @ingroup algorithms */
48     template<typename Body>
49     class while_iteration_task: public task {
50         const Body& my_body;
51         typename Body::argument_type my_value;
52         /*override*/ task* execute() {
53             my_body(my_value); 
54             return NULL;
55         }
56         while_iteration_task( const typename Body::argument_type& value, const Body& body ) : 
57             my_body(body), my_value(value)
58         {}
59         template<typename Body_> friend class while_group_task;
60         friend class tbb::parallel_while<Body>;
61     };
62
63     //! For internal use only
64     /** Unpacks a block of iterations.
65         @ingroup algorithms */
66     template<typename Body>
67     class while_group_task: public task {
68         static const size_t max_arg_size = 4;         
69         const Body& my_body;
70         size_t size;
71         typename Body::argument_type my_arg[max_arg_size];
72         while_group_task( const Body& body ) : my_body(body), size(0) {} 
73         /*override*/ task* execute() {
74             typedef while_iteration_task<Body> iteration_type;
75             __TBB_ASSERT( size>0, NULL );
76             task_list list;
77             task* t; 
78             size_t k=0; 
79             for(;;) {
80                 t = new( allocate_child() ) iteration_type(my_arg[k],my_body); 
81                 if( ++k==size ) break;
82                 list.push_back(*t);
83             }
84             set_ref_count(int(k+1));
85             spawn(list);
86             spawn_and_wait_for_all(*t);
87             return NULL;
88         }
89         template<typename Stream, typename Body_> friend class while_task;
90     };
91     
92     //! For internal use only.
93     /** Gets block of iterations from a stream and packages them into a while_group_task.
94         @ingroup algorithms */
95     template<typename Stream, typename Body>
96     class while_task: public task {
97         Stream& my_stream;
98         const Body& my_body;
99         empty_task& my_barrier;
100         /*override*/ task* execute() {
101             typedef while_group_task<Body> block_type;
102             block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
103             size_t k=0; 
104             while( my_stream.pop_if_present(t.my_arg[k]) ) {
105                 if( ++k==block_type::max_arg_size ) {
106                     // There might be more iterations.
107                     recycle_to_reexecute();
108                     break;
109                 }
110             }
111             if( k==0 ) {
112                 destroy(t);
113                 return NULL;
114             } else {
115                 t.size = k;
116                 return &t;
117             }
118         }
119         while_task( Stream& stream, const Body& body, empty_task& barrier ) : 
120             my_stream(stream),
121             my_body(body),
122             my_barrier(barrier)
123         {} 
124         friend class tbb::parallel_while<Body>;
125     };
126
127 } // namespace internal
128 //! @endcond
129
130 //! Parallel iteration over a stream, with optional addition of more work.
131 /** The Body b has the requirement: \n
132         "b(v)"                      \n
133         "b.argument_type"           \n
134     where v is an argument_type
135     @ingroup algorithms */
136 template<typename Body>
137 class parallel_while: internal::no_copy {
138 public:
139     //! Construct empty non-running parallel while.
140     parallel_while() : my_body(NULL), my_barrier(NULL) {}
141
142     //! Destructor cleans up data members before returning.
143     ~parallel_while() {
144         if( my_barrier ) {
145             my_barrier->destroy(*my_barrier);    
146             my_barrier = NULL;
147         }
148     }
149
150     //! Type of items
151     typedef typename Body::argument_type value_type;
152
153     //! Apply body.apply to each item in the stream.
154     /** A Stream s has the requirements \n
155          "S::value_type"                \n
156          "s.pop_if_present(value) is convertible to bool */
157     template<typename Stream>
158     void run( Stream& stream, const Body& body );
159
160     //! Add a work item while running.
161     /** Should be executed only by body.apply or a thread spawned therefrom. */
162     void add( const value_type& item );
163
164 private:
165     const Body* my_body;
166     empty_task* my_barrier;
167 };
168
169 template<typename Body>
170 template<typename Stream>
171 void parallel_while<Body>::run( Stream& stream, const Body& body ) {
172     using namespace internal;
173     empty_task& barrier = *new( task::allocate_root() ) empty_task();
174     my_body = &body;
175     my_barrier = &barrier;
176     my_barrier->set_ref_count(2);
177     while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
178     my_barrier->spawn_and_wait_for_all(w);
179     my_barrier->destroy(*my_barrier);
180     my_barrier = NULL;
181     my_body = NULL;
182 }
183
184 template<typename Body>
185 void parallel_while<Body>::add( const value_type& item ) {
186     __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
187     typedef internal::while_iteration_task<Body> iteration_type;
188     iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
189     task::self().spawn( i );
190 }
191
192 } // namespace 
193
194 #endif /* __TBB_parallel_while */