]> git.sesse.net Git - casparcg/blob - tbb30_20100406oss/include/tbb/concurrent_queue.h
c137fb391b1670ac9cf8bed0d315604a2bfdf8c7
[casparcg] / tbb30_20100406oss / include / tbb / concurrent_queue.h
1 /*
2     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_concurrent_queue_H
30 #define __TBB_concurrent_queue_H
31
32 #include "_concurrent_queue_internal.h"
33
34 namespace tbb {
35
36 namespace strict_ppl {
37
38 //! A high-performance thread-safe non-blocking concurrent queue.
39 /** Multiple threads may each push and pop concurrently.
40     Assignment construction is not allowed.
41     @ingroup containers */
42 template<typename T, typename A = cache_aligned_allocator<T> > 
43 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
44     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
45
46     //! Allocator type
47     typedef typename A::template rebind<char>::other page_allocator_type;
48     page_allocator_type my_allocator;
49
50     //! Allocates a block of size n (bytes)
51     /*overide*/ virtual void *allocate_block( size_t n ) {
52         void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
53         if( !b )
54             internal::throw_exception(internal::eid_bad_alloc); 
55         return b;
56     }
57
58     //! Deallocates block created by allocate_block.
59     /*override*/ virtual void deallocate_block( void *b, size_t n ) {
60         my_allocator.deallocate( reinterpret_cast<char*>(b), n );
61     }
62
63 public:
64     //! Element type in the queue.
65     typedef T value_type;
66
67     //! Reference type
68     typedef T& reference;
69
70     //! Const reference type
71     typedef const T& const_reference;
72
73     //! Integral type for representing size of the queue.
74     typedef size_t size_type;
75
76     //! Difference type for iterator
77     typedef ptrdiff_t difference_type;
78
79     //! Allocator type
80     typedef A allocator_type;
81
82     //! Construct empty queue
83     explicit concurrent_queue(const allocator_type& a = allocator_type()) : 
84         my_allocator( a )
85     {
86     }
87
88     //! [begin,end) constructor
89     template<typename InputIterator>
90     concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
91         my_allocator( a )
92     {
93         for( ; begin != end; ++begin )
94             internal_push(&*begin);
95     }
96     
97     //! Copy constructor
98     concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) : 
99         internal::concurrent_queue_base_v3<T>(), my_allocator( a )
100     {
101         assign( src );
102     }
103     
104     //! Destroy queue
105     ~concurrent_queue();
106
107     //! Enqueue an item at tail of queue.
108     void push( const T& source ) {
109         internal_push( &source );
110     }
111
112     //! Attempt to dequeue an item from head of queue.
113     /** Does not wait for item to become available.
114         Returns true if successful; false otherwise. */
115     bool try_pop( T& result ) {
116         return internal_try_pop( &result );
117     }
118
119     //! Return the number of items in the queue; thread unsafe
120     size_type unsafe_size() const {return this->internal_size();}
121
122     //! Equivalent to size()==0.
123     bool empty() const {return this->internal_empty();}
124
125     //! Clear the queue. not thread-safe.
126     void clear() ;
127
128     //! Return allocator object
129     allocator_type get_allocator() const { return this->my_allocator; }
130
131     typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
132     typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
133
134     //------------------------------------------------------------------------
135     // The iterators are intended only for debugging.  They are slow and not thread safe.
136     //------------------------------------------------------------------------
137     iterator unsafe_begin() {return iterator(*this);}
138     iterator unsafe_end() {return iterator();}
139     const_iterator unsafe_begin() const {return const_iterator(*this);}
140     const_iterator unsafe_end() const {return const_iterator();}
141 } ;
142
143 template<typename T, class A>
144 concurrent_queue<T,A>::~concurrent_queue() {
145     clear();
146     this->internal_finish_clear();
147 }
148
149 template<typename T, class A>
150 void concurrent_queue<T,A>::clear() {
151     while( !empty() ) {
152         T value;
153         this->internal_try_pop(&value);
154     }
155 }
156
157 } // namespace strict_ppl
158     
159 //! A high-performance thread-safe blocking concurrent bounded queue.
160 /** This is the pre-PPL TBB concurrent queue which supports boundedness and blocking semantics.
161     Note that method names agree with the PPL-style concurrent queue.
162     Multiple threads may each push and pop concurrently.
163     Assignment construction is not allowed.
164     @ingroup containers */
165 template<typename T, class A = cache_aligned_allocator<T> >
166 class concurrent_bounded_queue: public internal::concurrent_queue_base_v3 {
167     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
168
169     //! Allocator type
170     typedef typename A::template rebind<char>::other page_allocator_type;
171     page_allocator_type my_allocator;
172
173     typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
174  
175     //! Class used to ensure exception-safety of method "pop" 
176     class destroyer: internal::no_copy {
177         T& my_value;
178     public:
179         destroyer( T& value ) : my_value(value) {}
180         ~destroyer() {my_value.~T();}          
181     };
182
183     T& get_ref( page& p, size_t index ) {
184         __TBB_ASSERT( index<items_per_page, NULL );
185         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
186     }
187
188     /*override*/ virtual void copy_item( page& dst, size_t index, const void* src ) {
189         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
190     }
191
192     /*override*/ virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
193         new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
194     }
195
196     /*override*/ virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
197         T& from = get_ref(src,index);
198         destroyer d(from);
199         *static_cast<T*>(dst) = from;
200     }
201
202     /*overide*/ virtual page *allocate_page() {
203         size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
204         page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
205         if( !p )
206             internal::throw_exception(internal::eid_bad_alloc); 
207         return p;
208     }
209
210     /*override*/ virtual void deallocate_page( page *p ) {
211         size_t n = sizeof(padded_page) + items_per_page*sizeof(T);
212         my_allocator.deallocate( reinterpret_cast<char*>(p), n );
213     }
214
215 public:
216     //! Element type in the queue.
217     typedef T value_type;
218
219     //! Allocator type
220     typedef A allocator_type;
221
222     //! Reference type
223     typedef T& reference;
224
225     //! Const reference type
226     typedef const T& const_reference;
227
228     //! Integral type for representing size of the queue.
229     /** Notice that the size_type is a signed integral type.
230         This is because the size can be negative if there are pending pops without corresponding pushes. */
231     typedef std::ptrdiff_t size_type;
232
233     //! Difference type for iterator
234     typedef std::ptrdiff_t difference_type;
235
236     //! Construct empty queue
237     explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) : 
238         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
239     {
240     }
241
242     //! Copy constructor
243     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type()) : 
244         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
245     {
246         assign( src );
247     }
248
249     //! [begin,end) constructor
250     template<typename InputIterator>
251     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
252         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
253     {
254         for( ; begin != end; ++begin )
255             internal_push_if_not_full(&*begin);
256     }
257
258     //! Destroy queue
259     ~concurrent_bounded_queue();
260
261     //! Enqueue an item at tail of queue.
262     void push( const T& source ) {
263         internal_push( &source );
264     }
265
266     //! Dequeue item from head of queue.
267     /** Block until an item becomes available, and then dequeue it. */
268     void pop( T& destination ) {
269         internal_pop( &destination );
270     }
271
272     //! Enqueue an item at tail of queue if queue is not already full.
273     /** Does not wait for queue to become not full.
274         Returns true if item is pushed; false if queue was already full. */
275     bool try_push( const T& source ) {
276         return internal_push_if_not_full( &source );
277     }
278
279     //! Attempt to dequeue an item from head of queue.
280     /** Does not wait for item to become available.
281         Returns true if successful; false otherwise. */
282     bool try_pop( T& destination ) {
283         return internal_pop_if_present( &destination );
284     }
285
286     //! Return number of pushes minus number of pops.
287     /** Note that the result can be negative if there are pops waiting for the 
288         corresponding pushes.  The result can also exceed capacity() if there 
289         are push operations in flight. */
290     size_type size() const {return internal_size();}
291
292     //! Equivalent to size()<=0.
293     bool empty() const {return internal_empty();}
294
295     //! Maximum number of allowed elements
296     size_type capacity() const {
297         return my_capacity;
298     }
299
300     //! Set the capacity
301     /** Setting the capacity to 0 causes subsequent try_push operations to always fail,
302         and subsequent push operations to block forever. */
303     void set_capacity( size_type new_capacity ) {
304         internal_set_capacity( new_capacity, sizeof(T) );
305     }
306
307     //! return allocator object
308     allocator_type get_allocator() const { return this->my_allocator; }
309
310     //! clear the queue. not thread-safe.
311     void clear() ;
312
313     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
314     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
315
316     //------------------------------------------------------------------------
317     // The iterators are intended only for debugging.  They are slow and not thread safe.
318     //------------------------------------------------------------------------
319     iterator unsafe_begin() {return iterator(*this);}
320     iterator unsafe_end() {return iterator();}
321     const_iterator unsafe_begin() const {return const_iterator(*this);}
322     const_iterator unsafe_end() const {return const_iterator();}
323
324 }; 
325
326 template<typename T, class A>
327 concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
328     clear();
329     internal_finish_clear();
330 }
331
332 template<typename T, class A>
333 void concurrent_bounded_queue<T,A>::clear() {
334     while( !empty() ) {
335         T value;
336         internal_pop_if_present(&value);
337     }
338 }
339
340 namespace deprecated {
341
342 //! A high-performance thread-safe blocking concurrent bounded queue.
343 /** This is the pre-PPL TBB concurrent queue which support boundedness and blocking semantics.
344     Note that method names agree with the PPL-style concurrent queue.
345     Multiple threads may each push and pop concurrently.
346     Assignment construction is not allowed.
347     @ingroup containers */
348 template<typename T, class A = cache_aligned_allocator<T> > 
349 class concurrent_queue: public concurrent_bounded_queue<T,A> {
350 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
351     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
352 #endif 
353
354 public:
355     //! Construct empty queue
356     explicit concurrent_queue(const A& a = A()) : 
357         concurrent_bounded_queue<T,A>( a )
358     {
359     }
360
361     //! Copy constructor
362     concurrent_queue( const concurrent_queue& src, const A& a = A()) : 
363         concurrent_bounded_queue<T,A>( src, a )
364     {
365     }
366
367     //! [begin,end) constructor
368     template<typename InputIterator>
369     concurrent_queue( InputIterator b /*begin*/, InputIterator e /*end*/, const A& a = A()) :
370         concurrent_bounded_queue<T,A>( b, e, a )
371     {
372     }
373
374     //! Enqueue an item at tail of queue if queue is not already full.
375     /** Does not wait for queue to become not full.
376         Returns true if item is pushed; false if queue was already full. */
377     bool push_if_not_full( const T& source ) {
378         return try_push( source );
379     }
380
381     //! Attempt to dequeue an item from head of queue.
382     /** Does not wait for item to become available.
383         Returns true if successful; false otherwise. 
384         @deprecated Use try_pop()
385         */
386     bool pop_if_present( T& destination ) {
387         return try_pop( destination );
388     }
389
390     typedef typename concurrent_bounded_queue<T,A>::iterator iterator;
391     typedef typename concurrent_bounded_queue<T,A>::const_iterator const_iterator;
392     //
393     //------------------------------------------------------------------------
394     // The iterators are intended only for debugging.  They are slow and not thread safe.
395     //------------------------------------------------------------------------
396     iterator begin() {return this->unsafe_begin();}
397     iterator end() {return this->unsafe_end();}
398     const_iterator begin() const {return this->unsafe_begin();}
399     const_iterator end() const {return this->unsafe_end();}
400 }; 
401
402 }
403     
404
405 #if TBB_DEPRECATED
406 using deprecated::concurrent_queue;
407 #else
408 using strict_ppl::concurrent_queue;    
409 #endif
410
411 } // namespace tbb
412
413 #endif /* __TBB_concurrent_queue_H */