2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB__concurrent_queue_impl_H
30 #define __TBB__concurrent_queue_impl_H
32 #ifndef __TBB_concurrent_queue_H
33 #error Do not #include this internal file directly; use public TBB headers instead.
36 #include "../tbb_stddef.h"
37 #include "../tbb_machine.h"
38 #include "../atomic.h"
39 #include "../spin_mutex.h"
40 #include "../cache_aligned_allocator.h"
41 #include "../tbb_exception.h"
42 #include "../tbb_profiling.h"
45 #if !TBB_USE_EXCEPTIONS && _MSC_VER
46 // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
47 #pragma warning (push)
48 #pragma warning (disable: 4530)
53 #if !TBB_USE_EXCEPTIONS && _MSC_VER
59 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
61 // forward declaration
62 namespace strict_ppl {
63 template<typename T, typename A> class concurrent_queue;
66 template<typename T, typename A> class concurrent_bounded_queue;
68 namespace deprecated {
69 template<typename T, typename A> class concurrent_queue;
73 //! For internal use only.
74 namespace strict_ppl {
79 using namespace tbb::internal;
81 typedef size_t ticket;
83 template<typename T> class micro_queue ;
84 template<typename T> class micro_queue_pop_finalizer ;
85 template<typename T> class concurrent_queue_base_v3;
87 //! parts of concurrent_queue_rep that do not have references to micro_queue
89 * For internal use only.
91 struct concurrent_queue_rep_base : no_copy {
92 template<typename T> friend class micro_queue;
93 template<typename T> friend class concurrent_queue_base_v3;
96 //! Approximately n_queue/golden ratio
97 static const size_t phi = 3;
100 // must be power of 2
101 static const size_t n_queue = 8;
109 atomic<ticket> head_counter;
110 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
111 atomic<ticket> tail_counter;
112 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
114 //! Always a power of 2
115 size_t items_per_page;
120 //! number of invalid entries in the queue
121 atomic<size_t> n_invalid_entries;
123 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
126 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
127 return uintptr_t(p)>1;
130 //! Abstract class to define interface for page allocation/deallocation
132 * For internal use only.
134 class concurrent_queue_page_allocator
136 template<typename T> friend class micro_queue ;
137 template<typename T> friend class micro_queue_pop_finalizer ;
139 virtual ~concurrent_queue_page_allocator() {}
141 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
142 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
145 #if _MSC_VER && !defined(__INTEL_COMPILER)
146 // unary minus operator applied to unsigned type, result still unsigned
147 #pragma warning( push )
148 #pragma warning( disable: 4146 )
151 //! A queue using simple locking.
152 /** For efficiency, this class has no constructor.
153 The caller is expected to zero-initialize it. */
155 class micro_queue : no_copy {
156 typedef concurrent_queue_rep_base::page page;
158 //! Class used to ensure exception-safety of method "pop"
159 class destroyer: no_copy {
162 destroyer( T& value ) : my_value(value) {}
163 ~destroyer() {my_value.~T();}
166 void copy_item( page& dst, size_t index, const void* src ) {
167 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
170 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
171 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
174 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
175 T& from = get_ref(src,index);
177 *static_cast<T*>(dst) = from;
180 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
183 friend class micro_queue_pop_finalizer<T>;
185 struct padded_page: page {
186 //! Not defined anywhere - exists to quiet warnings.
188 //! Not defined anywhere - exists to quiet warnings.
189 void operator=( const padded_page& );
190 //! Must be last field.
194 static T& get_ref( page& p, size_t index ) {
195 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
198 atomic<page*> head_page;
199 atomic<ticket> head_counter;
201 atomic<page*> tail_page;
202 atomic<ticket> tail_counter;
204 spin_mutex page_mutex;
206 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
208 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
210 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
212 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
214 void invalidate_page_and_rethrow( ticket k ) ;
218 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
219 atomic_backoff backoff;
223 ++rb.n_invalid_entries;
224 throw_exception( eid_bad_last_alloc );
226 } while( counter!=k ) ;
230 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
231 k &= -concurrent_queue_rep_base::n_queue;
233 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
236 concurrent_queue_page_allocator& pa = base;
237 p = pa.allocate_page();
238 } __TBB_CATCH (...) {
239 ++base.my_rep->n_invalid_entries;
240 invalidate_page_and_rethrow( k );
246 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
247 call_itt_notify(acquired, &tail_counter);
250 spin_mutex::scoped_lock lock( page_mutex );
252 if( is_valid_page(q) )
261 copy_item( *p, index, item );
262 // If no exception was thrown, mark item as present.
263 itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
264 call_itt_notify(releasing, &tail_counter);
265 tail_counter += concurrent_queue_rep_base::n_queue;
266 } __TBB_CATCH (...) {
267 ++base.my_rep->n_invalid_entries;
268 call_itt_notify(releasing, &tail_counter);
269 tail_counter += concurrent_queue_rep_base::n_queue;
275 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
276 k &= -concurrent_queue_rep_base::n_queue;
277 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
278 call_itt_notify(acquired, &head_counter);
279 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
280 call_itt_notify(acquired, &tail_counter);
281 page& p = *head_page;
282 __TBB_ASSERT( &p, NULL );
283 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
284 bool success = false;
286 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
287 if( p.mask & uintptr_t(1)<<index ) {
289 assign_and_destroy_item( dst, p, index );
291 --base.my_rep->n_invalid_entries;
298 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
299 head_counter = src.head_counter;
300 tail_counter = src.tail_counter;
301 page_mutex = src.page_mutex;
303 const page* srcp = src.head_page;
304 if( is_valid_page(srcp) ) {
305 ticket g_index = head_counter;
307 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
308 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
309 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
311 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
312 page* cur_page = head_page;
314 if( srcp != src.tail_page ) {
315 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
316 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
317 cur_page = cur_page->next;
320 __TBB_ASSERT( srcp==src.tail_page, NULL );
321 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
322 if( last_index==0 ) last_index = base.my_rep->items_per_page;
324 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
325 cur_page = cur_page->next;
327 tail_page = cur_page;
328 } __TBB_CATCH (...) {
329 invalidate_page_and_rethrow( g_index );
332 head_page = tail_page = NULL;
338 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
339 // Append an invalid page at address 1 so that no more pushes are allowed.
340 page* invalid_page = (page*)uintptr_t(1);
342 spin_mutex::scoped_lock lock( page_mutex );
343 itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
345 if( is_valid_page(q) )
346 q->next = invalid_page;
348 head_page = invalid_page;
349 tail_page = invalid_page;
355 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
356 concurrent_queue_page_allocator& pa = base;
357 page* new_page = pa.allocate_page();
358 new_page->next = NULL;
359 new_page->mask = src_page->mask;
360 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
361 if( new_page->mask & uintptr_t(1)<<begin_in_page )
362 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
367 class micro_queue_pop_finalizer: no_copy {
368 typedef concurrent_queue_rep_base::page page;
370 micro_queue<T>& my_queue;
372 concurrent_queue_page_allocator& allocator;
374 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
375 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
377 ~micro_queue_pop_finalizer() ;
381 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
383 if( is_valid_page(p) ) {
384 spin_mutex::scoped_lock lock( my_queue.page_mutex );
386 my_queue.head_page = q;
387 if( !is_valid_page(q) ) {
388 my_queue.tail_page = NULL;
391 itt_store_word_with_release(my_queue.head_counter, my_ticket);
392 if( is_valid_page(p) ) {
393 allocator.deallocate_page( p );
397 #if _MSC_VER && !defined(__INTEL_COMPILER)
398 #pragma warning( pop )
399 #endif // warning 4146 is back
401 template<typename T> class concurrent_queue_iterator_rep ;
402 template<typename T> class concurrent_queue_iterator_base_v3;
404 //! representation of concurrent_queue_base
406 * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
409 struct concurrent_queue_rep : public concurrent_queue_rep_base {
410 micro_queue<T> array[n_queue];
412 //! Map ticket to an array index
413 static size_t index( ticket k ) {
414 return k*phi%n_queue;
417 micro_queue<T>& choose( ticket k ) {
418 // The formula here approximates LRU in a cache-oblivious way.
419 return array[index(k)];
423 //! base class of concurrent_queue
425 * The class implements the interface defined by concurrent_queue_page_allocator
426 * and has a pointer to an instance of concurrent_queue_rep.
429 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
430 //! Internal representation
431 concurrent_queue_rep<T>* my_rep;
433 friend struct concurrent_queue_rep<T>;
434 friend class micro_queue<T>;
435 friend class concurrent_queue_iterator_rep<T>;
436 friend class concurrent_queue_iterator_base_v3<T>;
439 typedef typename concurrent_queue_rep<T>::page page;
442 typedef typename micro_queue<T>::padded_page padded_page;
444 /* override */ virtual page *allocate_page() {
445 concurrent_queue_rep<T>& r = *my_rep;
446 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
447 return reinterpret_cast<page*>(allocate_block ( n ));
450 /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
451 concurrent_queue_rep<T>& r = *my_rep;
452 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
453 deallocate_block( reinterpret_cast<void*>(p), n );
457 virtual void *allocate_block( size_t n ) = 0;
459 //! custom de-allocator
460 virtual void deallocate_block( void *p, size_t n ) = 0;
463 concurrent_queue_base_v3();
465 /* override */ virtual ~concurrent_queue_base_v3() {
467 size_t nq = my_rep->n_queue;
468 for( size_t i=0; i<nq; i++ )
469 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
470 #endif /* TBB_USE_ASSERT */
471 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
474 //! Enqueue item at tail of queue
475 void internal_push( const void* src ) {
476 concurrent_queue_rep<T>& r = *my_rep;
477 ticket k = r.tail_counter++;
478 r.choose(k).push( src, k, *this );
481 //! Attempt to dequeue item from queue.
482 /** NULL if there was no item to dequeue. */
483 bool internal_try_pop( void* dst ) ;
485 //! Get size of queue; result may be invalid if queue is modified concurrently
486 size_t internal_size() const ;
488 //! check if the queue is empty; thread safe
489 bool internal_empty() const ;
491 //! free any remaining pages
492 /* note that the name may be misleading, but it remains so due to a historical accident. */
493 void internal_finish_clear() ;
496 void internal_throw_exception() const {
497 throw_exception( eid_bad_alloc );
500 //! copy internal representation
501 void assign( const concurrent_queue_base_v3& src ) ;
505 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
506 const size_t item_size = sizeof(T);
507 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
508 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
509 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
510 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
511 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
512 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
513 my_rep->item_size = item_size;
514 my_rep->items_per_page = item_size<=8 ? 32 :
523 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
524 concurrent_queue_rep<T>& r = *my_rep;
529 if( r.tail_counter<=k ) {
533 // Queue had item with ticket k when we looked. Attempt to get that item.
535 #if defined(_MSC_VER) && defined(_Wp64)
536 #pragma warning (push)
537 #pragma warning (disable: 4267)
539 k = r.head_counter.compare_and_swap( tk+1, tk );
540 #if defined(_MSC_VER) && defined(_Wp64)
541 #pragma warning (pop)
545 // Another thread snatched the item, retry.
547 } while( !r.choose( k ).pop( dst, k, *this ) );
552 size_t concurrent_queue_base_v3<T>::internal_size() const {
553 concurrent_queue_rep<T>& r = *my_rep;
554 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
555 ticket hc = r.head_counter;
556 size_t nie = r.n_invalid_entries;
557 ticket tc = r.tail_counter;
558 __TBB_ASSERT( hc!=tc || !nie, NULL );
559 ptrdiff_t sz = tc-hc-nie;
560 return sz<0 ? 0 : size_t(sz);
564 bool concurrent_queue_base_v3<T>::internal_empty() const {
565 concurrent_queue_rep<T>& r = *my_rep;
566 ticket tc = r.tail_counter;
567 ticket hc = r.head_counter;
568 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
569 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
573 void concurrent_queue_base_v3<T>::internal_finish_clear() {
574 concurrent_queue_rep<T>& r = *my_rep;
575 size_t nq = r.n_queue;
576 for( size_t i=0; i<nq; ++i ) {
577 page* tp = r.array[i].tail_page;
578 if( is_valid_page(tp) ) {
579 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
580 deallocate_page( tp );
581 r.array[i].tail_page = NULL;
583 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
588 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
589 concurrent_queue_rep<T>& r = *my_rep;
590 r.items_per_page = src.my_rep->items_per_page;
592 // copy concurrent_queue_rep.
593 r.head_counter = src.my_rep->head_counter;
594 r.tail_counter = src.my_rep->tail_counter;
595 r.n_invalid_entries = src.my_rep->n_invalid_entries;
598 for( size_t i = 0; i<r.n_queue; ++i )
599 r.array[i].assign( src.my_rep->array[i], *this);
601 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
602 "the source concurrent queue should not be concurrently modified." );
605 template<typename Container, typename Value> class concurrent_queue_iterator;
608 class concurrent_queue_iterator_rep: no_assign {
609 typedef typename micro_queue<T>::padded_page padded_page;
612 const concurrent_queue_base_v3<T>& my_queue;
613 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
614 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
615 head_counter(queue.my_rep->head_counter),
618 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
619 array[k] = queue.my_rep->array[k].head_page;
622 //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise.
623 bool get_item( T*& item, size_t k ) ;
627 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
628 if( k==my_queue.my_rep->tail_counter ) {
632 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
633 __TBB_ASSERT(p,NULL);
634 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
635 item = µ_queue<T>::get_ref(*p,i);
636 return (p->mask & uintptr_t(1)<<i)!=0;
640 //! Constness-independent portion of concurrent_queue_iterator.
641 /** @ingroup containers */
642 template<typename Value>
643 class concurrent_queue_iterator_base_v3 : no_assign {
644 //! Represents concurrent_queue over which we are iterating.
645 /** NULL if one past last element in queue. */
646 concurrent_queue_iterator_rep<Value>* my_rep;
648 template<typename C, typename T, typename U>
649 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
651 template<typename C, typename T, typename U>
652 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
654 //! Pointer to current item
657 //! Default constructor
658 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
659 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
660 __TBB_compiler_fence();
665 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
666 : no_assign(), my_rep(NULL), my_item(NULL) {
670 //! Construct iterator pointing to head of queue.
671 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
674 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
676 //! Advance iterator one step towards tail of queue.
680 ~concurrent_queue_iterator_base_v3() {
681 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
686 template<typename Value>
687 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
688 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
689 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
690 size_t k = my_rep->head_counter;
691 if( !my_rep->get_item(my_item, k) ) advance();
694 template<typename Value>
695 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
696 if( my_rep!=other.my_rep ) {
698 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
702 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
703 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
706 my_item = other.my_item;
709 template<typename Value>
710 void concurrent_queue_iterator_base_v3<Value>::advance() {
711 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
712 size_t k = my_rep->head_counter;
713 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
716 my_rep->get_item(tmp,k);
717 __TBB_ASSERT( my_item==tmp, NULL );
718 #endif /* TBB_USE_ASSERT */
719 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
720 if( i==queue.my_rep->items_per_page-1 ) {
721 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
725 my_rep->head_counter = ++k;
726 if( !my_rep->get_item(my_item, k) ) advance();
729 //! Similar to C++0x std::remove_cv
730 /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
731 template<typename T> struct tbb_remove_cv {typedef T type;};
732 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
733 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
734 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
736 //! Meets requirements of a forward iterator for STL.
737 /** Value is either the T or const T type of the container.
738 @ingroup containers */
739 template<typename Container, typename Value>
740 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
741 public std::iterator<std::forward_iterator_tag,Value> {
742 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
743 template<typename T, class A>
744 friend class ::tbb::strict_ppl::concurrent_queue;
746 public: // workaround for MSVC
748 //! Construct iterator pointing to head of queue.
749 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
750 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
755 concurrent_queue_iterator() {}
757 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
758 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
761 //! Iterator assignment
762 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
767 //! Reference to current item
768 Value& operator*() const {
769 return *static_cast<Value*>(this->my_item);
772 Value* operator->() const {return &operator*();}
774 //! Advance to next item in queue
775 concurrent_queue_iterator& operator++() {
781 Value* operator++(int) {
782 Value* result = &operator*();
786 }; // concurrent_queue_iterator
789 template<typename C, typename T, typename U>
790 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
791 return i.my_item==j.my_item;
794 template<typename C, typename T, typename U>
795 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
796 return i.my_item!=j.my_item;
799 } // namespace internal
803 } // namespace strict_ppl
808 class concurrent_queue_rep;
809 class concurrent_queue_iterator_rep;
810 class concurrent_queue_iterator_base_v3;
811 template<typename Container, typename Value> class concurrent_queue_iterator;
813 //! For internal use only.
814 /** Type-independent portion of concurrent_queue.
815 @ingroup containers */
816 class concurrent_queue_base_v3: no_copy {
817 //! Internal representation
818 concurrent_queue_rep* my_rep;
820 friend class concurrent_queue_rep;
821 friend struct micro_queue;
822 friend class micro_queue_pop_finalizer;
823 friend class concurrent_queue_iterator_rep;
824 friend class concurrent_queue_iterator_base_v3;
832 //! Capacity of the queue
833 ptrdiff_t my_capacity;
835 //! Always a power of 2
836 size_t items_per_page;
841 #if __TBB_GCC_3_3_PROTECTED_BROKEN
845 struct padded_page: page {
846 //! Not defined anywhere - exists to quiet warnings.
848 //! Not defined anywhere - exists to quiet warnings.
849 void operator=( const padded_page& );
850 //! Must be last field.
855 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
856 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
858 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
859 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
861 //! Enqueue item at tail of queue
862 void __TBB_EXPORTED_METHOD internal_push( const void* src );
864 //! Dequeue item from head of queue
865 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
867 //! Attempt to enqueue item onto queue.
868 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
870 //! Attempt to dequeue item from queue.
871 /** NULL if there was no item to dequeue. */
872 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
874 //! Get size of queue
875 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
877 //! Check if the queue is emtpy
878 bool __TBB_EXPORTED_METHOD internal_empty() const;
880 //! Set the queue capacity
881 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
884 virtual page *allocate_page() = 0;
886 //! custom de-allocator
887 virtual void deallocate_page( page *p ) = 0;
889 //! free any remaining pages
890 /* note that the name may be misleading, but it remains so due to a historical accident. */
891 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
893 //! throw an exception
894 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
896 //! copy internal representation
897 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
900 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
903 //! Type-independent portion of concurrent_queue_iterator.
904 /** @ingroup containers */
905 class concurrent_queue_iterator_base_v3 {
906 //! concurrent_queue over which we are iterating.
907 /** NULL if one past last element in queue. */
908 concurrent_queue_iterator_rep* my_rep;
910 template<typename C, typename T, typename U>
911 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
913 template<typename C, typename T, typename U>
914 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
916 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
918 //! Pointer to current item
921 //! Default constructor
922 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
925 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
929 //! Obsolete entry point for constructing iterator pointing to head of queue.
930 /** Does not work correctly for SSE types. */
931 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
933 //! Construct iterator pointing to head of queue.
934 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
937 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
939 //! Advance iterator one step towards tail of queue.
940 void __TBB_EXPORTED_METHOD advance();
943 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
946 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
948 //! Meets requirements of a forward iterator for STL.
949 /** Value is either the T or const T type of the container.
950 @ingroup containers */
951 template<typename Container, typename Value>
952 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
953 public std::iterator<std::forward_iterator_tag,Value> {
955 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
956 template<typename T, class A>
957 friend class ::tbb::concurrent_bounded_queue;
959 template<typename T, class A>
960 friend class ::tbb::deprecated::concurrent_queue;
962 public: // workaround for MSVC
964 //! Construct iterator pointing to head of queue.
965 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
966 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
971 concurrent_queue_iterator() {}
973 /** If Value==Container::value_type, then this routine is the copy constructor.
974 If Value==const Container::value_type, then this routine is a conversion constructor. */
975 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
976 concurrent_queue_iterator_base_v3(other)
979 //! Iterator assignment
980 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
985 //! Reference to current item
986 Value& operator*() const {
987 return *static_cast<Value*>(my_item);
990 Value* operator->() const {return &operator*();}
992 //! Advance to next item in queue
993 concurrent_queue_iterator& operator++() {
999 Value* operator++(int) {
1000 Value* result = &operator*();
1004 }; // concurrent_queue_iterator
1007 template<typename C, typename T, typename U>
1008 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1009 return i.my_item==j.my_item;
1012 template<typename C, typename T, typename U>
1013 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1014 return i.my_item!=j.my_item;
1017 } // namespace internal;
1023 #endif /* __TBB__concurrent_queue_impl_H */