2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB_concurrent_queue_internal_H
30 #define __TBB_concurrent_queue_internal_H
32 #include "../tbb_stddef.h"
33 #include "../tbb_machine.h"
34 #include "../atomic.h"
35 #include "../spin_mutex.h"
36 #include "../cache_aligned_allocator.h"
37 #include "../tbb_exception.h"
38 #include "../tbb_profiling.h"
41 #if !TBB_USE_EXCEPTIONS && _MSC_VER
42 // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
43 #pragma warning (push)
44 #pragma warning (disable: 4530)
49 #if !TBB_USE_EXCEPTIONS && _MSC_VER
55 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
57 // forward declaration
58 namespace strict_ppl {
59 template<typename T, typename A> class concurrent_queue;
62 template<typename T, typename A> class concurrent_bounded_queue;
64 namespace deprecated {
65 template<typename T, typename A> class concurrent_queue;
69 //! For internal use only.
70 namespace strict_ppl {
75 using namespace tbb::internal;
77 typedef size_t ticket;
79 template<typename T> class micro_queue ;
80 template<typename T> class micro_queue_pop_finalizer ;
81 template<typename T> class concurrent_queue_base_v3;
83 //! parts of concurrent_queue_rep that do not have references to micro_queue
85 * For internal use only.
87 struct concurrent_queue_rep_base : no_copy {
88 template<typename T> friend class micro_queue;
89 template<typename T> friend class concurrent_queue_base_v3;
92 //! Approximately n_queue/golden ratio
93 static const size_t phi = 3;
97 static const size_t n_queue = 8;
105 atomic<ticket> head_counter;
106 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
107 atomic<ticket> tail_counter;
108 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
110 //! Always a power of 2
111 size_t items_per_page;
116 //! number of invalid entries in the queue
117 atomic<size_t> n_invalid_entries;
119 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
122 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
123 return uintptr_t(p)>1;
126 //! Abstract class to define interface for page allocation/deallocation
128 * For internal use only.
130 class concurrent_queue_page_allocator
132 template<typename T> friend class micro_queue ;
133 template<typename T> friend class micro_queue_pop_finalizer ;
135 virtual ~concurrent_queue_page_allocator() {}
137 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
138 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
141 #if _MSC_VER && !defined(__INTEL_COMPILER)
142 // unary minus operator applied to unsigned type, result still unsigned
143 #pragma warning( push )
144 #pragma warning( disable: 4146 )
147 //! A queue using simple locking.
148 /** For efficiency, this class has no constructor.
149 The caller is expected to zero-initialize it. */
151 class micro_queue : no_copy {
152 typedef concurrent_queue_rep_base::page page;
154 //! Class used to ensure exception-safety of method "pop"
155 class destroyer: no_copy {
158 destroyer( T& value ) : my_value(value) {}
159 ~destroyer() {my_value.~T();}
162 void copy_item( page& dst, size_t index, const void* src ) {
163 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
166 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
167 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
170 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
171 T& from = get_ref(src,index);
173 *static_cast<T*>(dst) = from;
176 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
179 friend class micro_queue_pop_finalizer<T>;
181 struct padded_page: page {
182 //! Not defined anywhere - exists to quiet warnings.
184 //! Not defined anywhere - exists to quiet warnings.
185 void operator=( const padded_page& );
186 //! Must be last field.
190 static T& get_ref( page& p, size_t index ) {
191 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
194 atomic<page*> head_page;
195 atomic<ticket> head_counter;
197 atomic<page*> tail_page;
198 atomic<ticket> tail_counter;
200 spin_mutex page_mutex;
202 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
204 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
206 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
208 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
210 void invalidate_page_and_rethrow( ticket k ) ;
214 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
215 atomic_backoff backoff;
219 ++rb.n_invalid_entries;
220 throw_exception( eid_bad_last_alloc );
222 } while( counter!=k ) ;
226 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
227 k &= -concurrent_queue_rep_base::n_queue;
229 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
232 concurrent_queue_page_allocator& pa = base;
233 p = pa.allocate_page();
234 } __TBB_CATCH (...) {
235 ++base.my_rep->n_invalid_entries;
236 invalidate_page_and_rethrow( k );
242 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
243 call_itt_notify(acquired, &tail_counter);
246 spin_mutex::scoped_lock lock( page_mutex );
248 if( is_valid_page(q) )
257 copy_item( *p, index, item );
258 // If no exception was thrown, mark item as present.
259 itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
260 call_itt_notify(releasing, &tail_counter);
261 tail_counter += concurrent_queue_rep_base::n_queue;
262 } __TBB_CATCH (...) {
263 ++base.my_rep->n_invalid_entries;
264 call_itt_notify(releasing, &tail_counter);
265 tail_counter += concurrent_queue_rep_base::n_queue;
271 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
272 k &= -concurrent_queue_rep_base::n_queue;
273 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
274 call_itt_notify(acquired, &head_counter);
275 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
276 call_itt_notify(acquired, &tail_counter);
277 page& p = *head_page;
278 __TBB_ASSERT( &p, NULL );
279 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
280 bool success = false;
282 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
283 if( p.mask & uintptr_t(1)<<index ) {
285 assign_and_destroy_item( dst, p, index );
287 --base.my_rep->n_invalid_entries;
294 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
295 head_counter = src.head_counter;
296 tail_counter = src.tail_counter;
297 page_mutex = src.page_mutex;
299 const page* srcp = src.head_page;
300 if( is_valid_page(srcp) ) {
301 ticket g_index = head_counter;
303 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
304 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
305 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
307 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
308 page* cur_page = head_page;
310 if( srcp != src.tail_page ) {
311 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
312 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
313 cur_page = cur_page->next;
316 __TBB_ASSERT( srcp==src.tail_page, NULL );
317 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
318 if( last_index==0 ) last_index = base.my_rep->items_per_page;
320 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
321 cur_page = cur_page->next;
323 tail_page = cur_page;
324 } __TBB_CATCH (...) {
325 invalidate_page_and_rethrow( g_index );
328 head_page = tail_page = NULL;
334 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
335 // Append an invalid page at address 1 so that no more pushes are allowed.
336 page* invalid_page = (page*)uintptr_t(1);
338 spin_mutex::scoped_lock lock( page_mutex );
339 itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
341 if( is_valid_page(q) )
342 q->next = invalid_page;
344 head_page = invalid_page;
345 tail_page = invalid_page;
351 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
352 concurrent_queue_page_allocator& pa = base;
353 page* new_page = pa.allocate_page();
354 new_page->next = NULL;
355 new_page->mask = src_page->mask;
356 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
357 if( new_page->mask & uintptr_t(1)<<begin_in_page )
358 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
363 class micro_queue_pop_finalizer: no_copy {
364 typedef concurrent_queue_rep_base::page page;
366 micro_queue<T>& my_queue;
368 concurrent_queue_page_allocator& allocator;
370 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
371 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
373 ~micro_queue_pop_finalizer() ;
377 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
379 if( is_valid_page(p) ) {
380 spin_mutex::scoped_lock lock( my_queue.page_mutex );
382 my_queue.head_page = q;
383 if( !is_valid_page(q) ) {
384 my_queue.tail_page = NULL;
387 itt_store_word_with_release(my_queue.head_counter, my_ticket);
388 if( is_valid_page(p) ) {
389 allocator.deallocate_page( p );
393 #if _MSC_VER && !defined(__INTEL_COMPILER)
394 #pragma warning( pop )
395 #endif // warning 4146 is back
397 template<typename T> class concurrent_queue_iterator_rep ;
398 template<typename T> class concurrent_queue_iterator_base_v3;
400 //! representation of concurrent_queue_base
402 * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
405 struct concurrent_queue_rep : public concurrent_queue_rep_base {
406 micro_queue<T> array[n_queue];
408 //! Map ticket to an array index
409 static size_t index( ticket k ) {
410 return k*phi%n_queue;
413 micro_queue<T>& choose( ticket k ) {
414 // The formula here approximates LRU in a cache-oblivious way.
415 return array[index(k)];
419 //! base class of concurrent_queue
421 * The class implements the interface defined by concurrent_queue_page_allocator
422 * and has a pointer to an instance of concurrent_queue_rep.
425 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
426 //! Internal representation
427 concurrent_queue_rep<T>* my_rep;
429 friend struct concurrent_queue_rep<T>;
430 friend class micro_queue<T>;
431 friend class concurrent_queue_iterator_rep<T>;
432 friend class concurrent_queue_iterator_base_v3<T>;
435 typedef typename concurrent_queue_rep<T>::page page;
438 typedef typename micro_queue<T>::padded_page padded_page;
440 /* override */ virtual page *allocate_page() {
441 concurrent_queue_rep<T>& r = *my_rep;
442 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
443 return reinterpret_cast<page*>(allocate_block ( n ));
446 /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
447 concurrent_queue_rep<T>& r = *my_rep;
448 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
449 deallocate_block( reinterpret_cast<void*>(p), n );
453 virtual void *allocate_block( size_t n ) = 0;
455 //! custom de-allocator
456 virtual void deallocate_block( void *p, size_t n ) = 0;
459 concurrent_queue_base_v3();
461 /* override */ virtual ~concurrent_queue_base_v3() {
463 size_t nq = my_rep->n_queue;
464 for( size_t i=0; i<nq; i++ )
465 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
466 #endif /* TBB_USE_ASSERT */
467 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
470 //! Enqueue item at tail of queue
471 void internal_push( const void* src ) {
472 concurrent_queue_rep<T>& r = *my_rep;
473 ticket k = r.tail_counter++;
474 r.choose(k).push( src, k, *this );
477 //! Attempt to dequeue item from queue.
478 /** NULL if there was no item to dequeue. */
479 bool internal_try_pop( void* dst ) ;
481 //! Get size of queue; result may be invalid if queue is modified concurrently
482 size_t internal_size() const ;
484 //! check if the queue is empty; thread safe
485 bool internal_empty() const ;
487 //! free any remaining pages
488 /* note that the name may be misleading, but it remains so due to a historical accident. */
489 void internal_finish_clear() ;
492 void internal_throw_exception() const {
493 throw_exception( eid_bad_alloc );
496 //! copy internal representation
497 void assign( const concurrent_queue_base_v3& src ) ;
501 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
502 const size_t item_size = sizeof(T);
503 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
504 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
505 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
506 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
507 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
508 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
509 my_rep->item_size = item_size;
510 my_rep->items_per_page = item_size<=8 ? 32 :
519 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
520 concurrent_queue_rep<T>& r = *my_rep;
525 if( r.tail_counter<=k ) {
529 // Queue had item with ticket k when we looked. Attempt to get that item.
531 #if defined(_MSC_VER) && defined(_Wp64)
532 #pragma warning (push)
533 #pragma warning (disable: 4267)
535 k = r.head_counter.compare_and_swap( tk+1, tk );
536 #if defined(_MSC_VER) && defined(_Wp64)
537 #pragma warning (pop)
541 // Another thread snatched the item, retry.
543 } while( !r.choose( k ).pop( dst, k, *this ) );
548 size_t concurrent_queue_base_v3<T>::internal_size() const {
549 concurrent_queue_rep<T>& r = *my_rep;
550 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
551 ticket hc = r.head_counter;
552 size_t nie = r.n_invalid_entries;
553 ticket tc = r.tail_counter;
554 __TBB_ASSERT( hc!=tc || !nie, NULL );
555 ptrdiff_t sz = tc-hc-nie;
556 return sz<0 ? 0 : size_t(sz);
560 bool concurrent_queue_base_v3<T>::internal_empty() const {
561 concurrent_queue_rep<T>& r = *my_rep;
562 ticket tc = r.tail_counter;
563 ticket hc = r.head_counter;
564 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
565 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
569 void concurrent_queue_base_v3<T>::internal_finish_clear() {
570 concurrent_queue_rep<T>& r = *my_rep;
571 size_t nq = r.n_queue;
572 for( size_t i=0; i<nq; ++i ) {
573 page* tp = r.array[i].tail_page;
574 if( is_valid_page(tp) ) {
575 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
576 deallocate_page( tp );
577 r.array[i].tail_page = NULL;
579 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
584 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
585 concurrent_queue_rep<T>& r = *my_rep;
586 r.items_per_page = src.my_rep->items_per_page;
588 // copy concurrent_queue_rep.
589 r.head_counter = src.my_rep->head_counter;
590 r.tail_counter = src.my_rep->tail_counter;
591 r.n_invalid_entries = src.my_rep->n_invalid_entries;
594 for( size_t i = 0; i<r.n_queue; ++i )
595 r.array[i].assign( src.my_rep->array[i], *this);
597 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
598 "the source concurrent queue should not be concurrently modified." );
601 template<typename Container, typename Value> class concurrent_queue_iterator;
604 class concurrent_queue_iterator_rep: no_assign {
605 typedef typename micro_queue<T>::padded_page padded_page;
608 const concurrent_queue_base_v3<T>& my_queue;
609 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
610 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
611 head_counter(queue.my_rep->head_counter),
614 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
615 array[k] = queue.my_rep->array[k].head_page;
618 //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise.
619 bool get_item( T*& item, size_t k ) ;
623 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
624 if( k==my_queue.my_rep->tail_counter ) {
628 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
629 __TBB_ASSERT(p,NULL);
630 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
631 item = µ_queue<T>::get_ref(*p,i);
632 return (p->mask & uintptr_t(1)<<i)!=0;
636 //! Constness-independent portion of concurrent_queue_iterator.
637 /** @ingroup containers */
638 template<typename Value>
639 class concurrent_queue_iterator_base_v3 : no_assign {
640 //! Represents concurrent_queue over which we are iterating.
641 /** NULL if one past last element in queue. */
642 concurrent_queue_iterator_rep<Value>* my_rep;
644 template<typename C, typename T, typename U>
645 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
647 template<typename C, typename T, typename U>
648 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
650 //! Pointer to current item
653 //! Default constructor
654 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
655 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
656 __TBB_compiler_fence();
661 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
662 : no_assign(), my_rep(NULL), my_item(NULL) {
666 //! Construct iterator pointing to head of queue.
667 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
670 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
672 //! Advance iterator one step towards tail of queue.
676 ~concurrent_queue_iterator_base_v3() {
677 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
682 template<typename Value>
683 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
684 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
685 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
686 size_t k = my_rep->head_counter;
687 if( !my_rep->get_item(my_item, k) ) advance();
690 template<typename Value>
691 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
692 if( my_rep!=other.my_rep ) {
694 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
698 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
699 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
702 my_item = other.my_item;
705 template<typename Value>
706 void concurrent_queue_iterator_base_v3<Value>::advance() {
707 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
708 size_t k = my_rep->head_counter;
709 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
712 my_rep->get_item(tmp,k);
713 __TBB_ASSERT( my_item==tmp, NULL );
714 #endif /* TBB_USE_ASSERT */
715 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
716 if( i==queue.my_rep->items_per_page-1 ) {
717 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
721 my_rep->head_counter = ++k;
722 if( !my_rep->get_item(my_item, k) ) advance();
725 //! Similar to C++0x std::remove_cv
726 /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
727 template<typename T> struct tbb_remove_cv {typedef T type;};
728 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
729 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
730 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
732 //! Meets requirements of a forward iterator for STL.
733 /** Value is either the T or const T type of the container.
734 @ingroup containers */
735 template<typename Container, typename Value>
736 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
737 public std::iterator<std::forward_iterator_tag,Value> {
738 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
739 template<typename T, class A>
740 friend class ::tbb::strict_ppl::concurrent_queue;
742 public: // workaround for MSVC
744 //! Construct iterator pointing to head of queue.
745 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
746 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
751 concurrent_queue_iterator() {}
753 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
754 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
757 //! Iterator assignment
758 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
763 //! Reference to current item
764 Value& operator*() const {
765 return *static_cast<Value*>(this->my_item);
768 Value* operator->() const {return &operator*();}
770 //! Advance to next item in queue
771 concurrent_queue_iterator& operator++() {
777 Value* operator++(int) {
778 Value* result = &operator*();
782 }; // concurrent_queue_iterator
785 template<typename C, typename T, typename U>
786 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
787 return i.my_item==j.my_item;
790 template<typename C, typename T, typename U>
791 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
792 return i.my_item!=j.my_item;
795 } // namespace internal
799 } // namespace strict_ppl
804 class concurrent_queue_rep;
805 class concurrent_queue_iterator_rep;
806 class concurrent_queue_iterator_base_v3;
807 template<typename Container, typename Value> class concurrent_queue_iterator;
809 //! For internal use only.
810 /** Type-independent portion of concurrent_queue.
811 @ingroup containers */
812 class concurrent_queue_base_v3: no_copy {
813 //! Internal representation
814 concurrent_queue_rep* my_rep;
816 friend class concurrent_queue_rep;
817 friend struct micro_queue;
818 friend class micro_queue_pop_finalizer;
819 friend class concurrent_queue_iterator_rep;
820 friend class concurrent_queue_iterator_base_v3;
828 //! Capacity of the queue
829 ptrdiff_t my_capacity;
831 //! Always a power of 2
832 size_t items_per_page;
837 #if __TBB_GCC_3_3_PROTECTED_BROKEN
841 struct padded_page: page {
842 //! Not defined anywhere - exists to quiet warnings.
844 //! Not defined anywhere - exists to quiet warnings.
845 void operator=( const padded_page& );
846 //! Must be last field.
851 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
852 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
854 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
855 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
857 //! Enqueue item at tail of queue
858 void __TBB_EXPORTED_METHOD internal_push( const void* src );
860 //! Dequeue item from head of queue
861 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
863 //! Attempt to enqueue item onto queue.
864 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
866 //! Attempt to dequeue item from queue.
867 /** NULL if there was no item to dequeue. */
868 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
870 //! Get size of queue
871 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
873 //! Check if the queue is emtpy
874 bool __TBB_EXPORTED_METHOD internal_empty() const;
876 //! Set the queue capacity
877 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
880 virtual page *allocate_page() = 0;
882 //! custom de-allocator
883 virtual void deallocate_page( page *p ) = 0;
885 //! free any remaining pages
886 /* note that the name may be misleading, but it remains so due to a historical accident. */
887 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
889 //! throw an exception
890 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
892 //! copy internal representation
893 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
896 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
899 //! Type-independent portion of concurrent_queue_iterator.
900 /** @ingroup containers */
901 class concurrent_queue_iterator_base_v3 {
902 //! concurrent_queue over which we are iterating.
903 /** NULL if one past last element in queue. */
904 concurrent_queue_iterator_rep* my_rep;
906 template<typename C, typename T, typename U>
907 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
909 template<typename C, typename T, typename U>
910 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
912 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
914 //! Pointer to current item
917 //! Default constructor
918 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
921 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
925 //! Obsolete entry point for constructing iterator pointing to head of queue.
926 /** Does not work correctly for SSE types. */
927 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
929 //! Construct iterator pointing to head of queue.
930 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
933 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
935 //! Advance iterator one step towards tail of queue.
936 void __TBB_EXPORTED_METHOD advance();
939 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
942 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
944 //! Meets requirements of a forward iterator for STL.
945 /** Value is either the T or const T type of the container.
946 @ingroup containers */
947 template<typename Container, typename Value>
948 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
949 public std::iterator<std::forward_iterator_tag,Value> {
951 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
952 template<typename T, class A>
953 friend class ::tbb::concurrent_bounded_queue;
955 template<typename T, class A>
956 friend class ::tbb::deprecated::concurrent_queue;
958 public: // workaround for MSVC
960 //! Construct iterator pointing to head of queue.
961 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
962 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
967 concurrent_queue_iterator() {}
969 /** If Value==Container::value_type, then this routine is the copy constructor.
970 If Value==const Container::value_type, then this routine is a conversion constructor. */
971 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
972 concurrent_queue_iterator_base_v3(other)
975 //! Iterator assignment
976 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
981 //! Reference to current item
982 Value& operator*() const {
983 return *static_cast<Value*>(my_item);
986 Value* operator->() const {return &operator*();}
988 //! Advance to next item in queue
989 concurrent_queue_iterator& operator++() {
995 Value* operator++(int) {
996 Value* result = &operator*();
1000 }; // concurrent_queue_iterator
1003 template<typename C, typename T, typename U>
1004 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1005 return i.my_item==j.my_item;
1008 template<typename C, typename T, typename U>
1009 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1010 return i.my_item!=j.my_item;
1013 } // namespace internal;
1019 #endif /* __TBB_concurrent_queue_internal_H */