2 Copyright 2005-2010 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB_concurrent_queue_internal_H
30 #define __TBB_concurrent_queue_internal_H
32 #include "tbb_stddef.h"
33 #include "tbb_machine.h"
35 #include "spin_mutex.h"
36 #include "cache_aligned_allocator.h"
37 #include "tbb_exception.h"
40 #if !TBB_USE_EXCEPTIONS && _MSC_VER
41 // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
42 #pragma warning (push)
43 #pragma warning (disable: 4530)
48 #if !TBB_USE_EXCEPTIONS && _MSC_VER
55 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
57 // forward declaration
58 namespace strict_ppl {
59 template<typename T, typename A> class concurrent_queue;
62 template<typename T, typename A> class concurrent_bounded_queue;
64 namespace deprecated {
65 template<typename T, typename A> class concurrent_queue;
69 //! For internal use only.
70 namespace strict_ppl {
75 using namespace tbb::internal;
77 typedef size_t ticket;
79 template<typename T> class micro_queue ;
80 template<typename T> class micro_queue_pop_finalizer ;
81 template<typename T> class concurrent_queue_base_v3;
83 //! parts of concurrent_queue_rep that do not have references to micro_queue
85 * For internal use only.
87 struct concurrent_queue_rep_base : no_copy {
88 template<typename T> friend class micro_queue;
89 template<typename T> friend class concurrent_queue_base_v3;
92 //! Approximately n_queue/golden ratio
93 static const size_t phi = 3;
97 static const size_t n_queue = 8;
105 atomic<ticket> head_counter;
106 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
107 atomic<ticket> tail_counter;
108 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
110 //! Always a power of 2
111 size_t items_per_page;
116 //! number of invalid entries in the queue
117 atomic<size_t> n_invalid_entries;
119 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
122 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
123 return uintptr_t(p)>1;
126 //! Abstract class to define interface for page allocation/deallocation
128 * For internal use only.
130 class concurrent_queue_page_allocator
132 template<typename T> friend class micro_queue ;
133 template<typename T> friend class micro_queue_pop_finalizer ;
135 virtual ~concurrent_queue_page_allocator() {}
137 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
138 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
141 #if _MSC_VER && !defined(__INTEL_COMPILER)
142 // unary minus operator applied to unsigned type, result still unsigned
143 #pragma warning( push )
144 #pragma warning( disable: 4146 )
147 //! A queue using simple locking.
148 /** For efficient, this class has no constructor.
149 The caller is expected to zero-initialize it. */
151 class micro_queue : no_copy {
152 typedef concurrent_queue_rep_base::page page;
154 //! Class used to ensure exception-safety of method "pop"
155 class destroyer: no_copy {
158 destroyer( T& value ) : my_value(value) {}
159 ~destroyer() {my_value.~T();}
162 void copy_item( page& dst, size_t index, const void* src ) {
163 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
166 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
167 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
170 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
171 T& from = get_ref(src,index);
173 *static_cast<T*>(dst) = from;
176 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
179 friend class micro_queue_pop_finalizer<T>;
181 struct padded_page: page {
182 //! Not defined anywhere - exists to quiet warnings.
184 //! Not defined anywhere - exists to quiet warnings.
185 void operator=( const padded_page& );
186 //! Must be last field.
190 static T& get_ref( page& p, size_t index ) {
191 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
194 atomic<page*> head_page;
195 atomic<ticket> head_counter;
197 atomic<page*> tail_page;
198 atomic<ticket> tail_counter;
200 spin_mutex page_mutex;
202 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
204 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
206 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
208 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
210 void invalidate_page_and_rethrow( ticket k ) ;
214 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
215 atomic_backoff backoff;
219 ++rb.n_invalid_entries;
220 throw_exception( eid_bad_last_alloc );
222 } while( counter!=k ) ;
226 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
227 k &= -concurrent_queue_rep_base::n_queue;
229 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
232 concurrent_queue_page_allocator& pa = base;
233 p = pa.allocate_page();
234 } __TBB_CATCH (...) {
235 ++base.my_rep->n_invalid_entries;
236 invalidate_page_and_rethrow( k );
242 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
245 spin_mutex::scoped_lock lock( page_mutex );
247 if( is_valid_page(q) )
257 copy_item( *p, index, item );
258 // If no exception was thrown, mark item as present.
259 p->mask |= uintptr_t(1)<<index;
260 tail_counter += concurrent_queue_rep_base::n_queue;
261 } __TBB_CATCH (...) {
262 ++base.my_rep->n_invalid_entries;
263 tail_counter += concurrent_queue_rep_base::n_queue;
269 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
270 k &= -concurrent_queue_rep_base::n_queue;
271 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
272 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
273 page& p = *head_page;
274 __TBB_ASSERT( &p, NULL );
275 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
276 bool success = false;
278 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
279 if( p.mask & uintptr_t(1)<<index ) {
281 assign_and_destroy_item( dst, p, index );
283 --base.my_rep->n_invalid_entries;
290 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
291 head_counter = src.head_counter;
292 tail_counter = src.tail_counter;
293 page_mutex = src.page_mutex;
295 const page* srcp = src.head_page;
296 if( is_valid_page(srcp) ) {
297 ticket g_index = head_counter;
299 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
300 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
301 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
303 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
304 page* cur_page = head_page;
306 if( srcp != src.tail_page ) {
307 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
308 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
309 cur_page = cur_page->next;
312 __TBB_ASSERT( srcp==src.tail_page, NULL );
313 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
314 if( last_index==0 ) last_index = base.my_rep->items_per_page;
316 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
317 cur_page = cur_page->next;
319 tail_page = cur_page;
320 } __TBB_CATCH (...) {
321 invalidate_page_and_rethrow( g_index );
324 head_page = tail_page = NULL;
330 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
331 // Append an invalid page at address 1 so that no more pushes are allowed.
332 page* invalid_page = (page*)uintptr_t(1);
334 spin_mutex::scoped_lock lock( page_mutex );
335 tail_counter = k+concurrent_queue_rep_base::n_queue+1;
337 if( is_valid_page(q) )
338 q->next = invalid_page;
340 head_page = invalid_page;
341 tail_page = invalid_page;
347 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
348 concurrent_queue_page_allocator& pa = base;
349 page* new_page = pa.allocate_page();
350 new_page->next = NULL;
351 new_page->mask = src_page->mask;
352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353 if( new_page->mask & uintptr_t(1)<<begin_in_page )
354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
359 class micro_queue_pop_finalizer: no_copy {
360 typedef concurrent_queue_rep_base::page page;
362 micro_queue<T>& my_queue;
364 concurrent_queue_page_allocator& allocator;
366 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
367 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
369 ~micro_queue_pop_finalizer() ;
373 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
375 if( is_valid_page(p) ) {
376 spin_mutex::scoped_lock lock( my_queue.page_mutex );
378 my_queue.head_page = q;
379 if( !is_valid_page(q) ) {
380 my_queue.tail_page = NULL;
383 my_queue.head_counter = my_ticket;
384 if( is_valid_page(p) ) {
385 allocator.deallocate_page( p );
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
393 template<typename T> class concurrent_queue_iterator_rep ;
394 template<typename T> class concurrent_queue_iterator_base_v3;
396 //! representation of concurrent_queue_base
398 * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
401 struct concurrent_queue_rep : public concurrent_queue_rep_base {
402 micro_queue<T> array[n_queue];
404 //! Map ticket to an array index
405 static size_t index( ticket k ) {
406 return k*phi%n_queue;
409 micro_queue<T>& choose( ticket k ) {
410 // The formula here approximates LRU in a cache-oblivious way.
411 return array[index(k)];
415 //! base class of concurrent_queue
417 * The class implements the interface defined by concurrent_queue_page_allocator
418 * and has a pointer to an instance of concurrent_queue_rep.
421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
422 //! Internal representation
423 concurrent_queue_rep<T>* my_rep;
425 friend struct concurrent_queue_rep<T>;
426 friend class micro_queue<T>;
427 friend class concurrent_queue_iterator_rep<T>;
428 friend class concurrent_queue_iterator_base_v3<T>;
431 typedef typename concurrent_queue_rep<T>::page page;
434 typedef typename micro_queue<T>::padded_page padded_page;
436 /* override */ virtual page *allocate_page() {
437 concurrent_queue_rep<T>& r = *my_rep;
438 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
439 return reinterpret_cast<page*>(allocate_block ( n ));
442 /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
443 concurrent_queue_rep<T>& r = *my_rep;
444 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
445 deallocate_block( reinterpret_cast<void*>(p), n );
449 virtual void *allocate_block( size_t n ) = 0;
451 //! custom de-allocator
452 virtual void deallocate_block( void *p, size_t n ) = 0;
455 concurrent_queue_base_v3();
457 /* override */ virtual ~concurrent_queue_base_v3() {
459 size_t nq = my_rep->n_queue;
460 for( size_t i=0; i<nq; i++ )
461 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
462 #endif /* __TBB_USE_ASSERT */
463 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
466 //! Enqueue item at tail of queue
467 void internal_push( const void* src ) {
468 concurrent_queue_rep<T>& r = *my_rep;
469 ticket k = r.tail_counter++;
470 r.choose(k).push( src, k, *this );
473 //! Attempt to dequeue item from queue.
474 /** NULL if there was no item to dequeue. */
475 bool internal_try_pop( void* dst ) ;
477 //! Get size of queue; result may be invalid if queue is modified concurrently
478 size_t internal_size() const ;
480 //! check if the queue is empty; thread safe
481 bool internal_empty() const ;
483 //! free any remaining pages
484 /* note that the name may be misleading, but it remains so due to a historical accident. */
485 void internal_finish_clear() ;
488 void internal_throw_exception() const {
489 throw_exception( eid_bad_alloc );
492 //! copy internal representation
493 void assign( const concurrent_queue_base_v3& src ) ;
497 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
498 const size_t item_size = sizeof(T);
499 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
500 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
501 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
502 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
503 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
504 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
505 my_rep->item_size = item_size;
506 my_rep->items_per_page = item_size<=8 ? 32 :
515 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
516 concurrent_queue_rep<T>& r = *my_rep;
521 if( r.tail_counter<=k ) {
525 // Queue had item with ticket k when we looked. Attempt to get that item.
527 #if defined(_MSC_VER) && defined(_Wp64)
528 #pragma warning (push)
529 #pragma warning (disable: 4267)
531 k = r.head_counter.compare_and_swap( tk+1, tk );
532 #if defined(_MSC_VER) && defined(_Wp64)
533 #pragma warning (pop)
537 // Another thread snatched the item, retry.
539 } while( !r.choose( k ).pop( dst, k, *this ) );
544 size_t concurrent_queue_base_v3<T>::internal_size() const {
545 concurrent_queue_rep<T>& r = *my_rep;
546 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
547 ticket hc = r.head_counter;
548 size_t nie = r.n_invalid_entries;
549 ticket tc = r.tail_counter;
550 __TBB_ASSERT( hc!=tc || !nie, NULL );
551 ptrdiff_t sz = tc-hc-nie;
552 return sz<0 ? 0 : size_t(sz);
556 bool concurrent_queue_base_v3<T>::internal_empty() const {
557 concurrent_queue_rep<T>& r = *my_rep;
558 ticket tc = r.tail_counter;
559 ticket hc = r.head_counter;
560 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
561 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
565 void concurrent_queue_base_v3<T>::internal_finish_clear() {
566 concurrent_queue_rep<T>& r = *my_rep;
567 size_t nq = r.n_queue;
568 for( size_t i=0; i<nq; ++i ) {
569 page* tp = r.array[i].tail_page;
570 if( is_valid_page(tp) ) {
571 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
572 deallocate_page( tp );
573 r.array[i].tail_page = NULL;
575 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
580 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
581 concurrent_queue_rep<T>& r = *my_rep;
582 r.items_per_page = src.my_rep->items_per_page;
584 // copy concurrent_queue_rep.
585 r.head_counter = src.my_rep->head_counter;
586 r.tail_counter = src.my_rep->tail_counter;
587 r.n_invalid_entries = src.my_rep->n_invalid_entries;
590 for( size_t i = 0; i<r.n_queue; ++i )
591 r.array[i].assign( src.my_rep->array[i], *this);
593 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
594 "the source concurrent queue should not be concurrently modified." );
597 template<typename Container, typename Value> class concurrent_queue_iterator;
600 class concurrent_queue_iterator_rep: no_assign {
601 typedef typename micro_queue<T>::padded_page padded_page;
604 const concurrent_queue_base_v3<T>& my_queue;
605 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
606 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
607 head_counter(queue.my_rep->head_counter),
610 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
611 array[k] = queue.my_rep->array[k].head_page;
614 //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise.
615 bool get_item( T*& item, size_t k ) ;
619 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
620 if( k==my_queue.my_rep->tail_counter ) {
624 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
625 __TBB_ASSERT(p,NULL);
626 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
627 item = µ_queue<T>::get_ref(*p,i);
628 return (p->mask & uintptr_t(1)<<i)!=0;
632 //! Constness-independent portion of concurrent_queue_iterator.
633 /** @ingroup containers */
634 template<typename Value>
635 class concurrent_queue_iterator_base_v3 : no_assign {
636 //! Represents concurrent_queue over which we are iterating.
637 /** NULL if one past last element in queue. */
638 concurrent_queue_iterator_rep<Value>* my_rep;
640 template<typename C, typename T, typename U>
641 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
643 template<typename C, typename T, typename U>
644 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
646 //! Pointer to current item
649 //! Default constructor
650 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
651 #if __GNUC__==4&&__GNUC_MINOR__==3
652 // to get around a possible gcc 4.3 bug
653 __asm__ __volatile__("": : :"memory");
658 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
659 : no_assign(), my_rep(NULL), my_item(NULL) {
663 //! Construct iterator pointing to head of queue.
664 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
667 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
669 //! Advance iterator one step towards tail of queue.
673 ~concurrent_queue_iterator_base_v3() {
674 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
679 template<typename Value>
680 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
681 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
682 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
683 size_t k = my_rep->head_counter;
684 if( !my_rep->get_item(my_item, k) ) advance();
687 template<typename Value>
688 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
689 if( my_rep!=other.my_rep ) {
691 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
695 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
696 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
699 my_item = other.my_item;
702 template<typename Value>
703 void concurrent_queue_iterator_base_v3<Value>::advance() {
704 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
705 size_t k = my_rep->head_counter;
706 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
709 my_rep->get_item(tmp,k);
710 __TBB_ASSERT( my_item==tmp, NULL );
711 #endif /* TBB_USE_ASSERT */
712 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
713 if( i==queue.my_rep->items_per_page-1 ) {
714 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
718 my_rep->head_counter = ++k;
719 if( !my_rep->get_item(my_item, k) ) advance();
722 //! Similar to C++0x std::remove_cv
723 /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
724 template<typename T> struct tbb_remove_cv {typedef T type;};
725 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
726 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
727 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
729 //! Meets requirements of a forward iterator for STL.
730 /** Value is either the T or const T type of the container.
731 @ingroup containers */
732 template<typename Container, typename Value>
733 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
734 public std::iterator<std::forward_iterator_tag,Value> {
735 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
736 template<typename T, class A>
737 friend class ::tbb::strict_ppl::concurrent_queue;
739 public: // workaround for MSVC
741 //! Construct iterator pointing to head of queue.
742 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
743 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
748 concurrent_queue_iterator() {}
750 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
751 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
754 //! Iterator assignment
755 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
760 //! Reference to current item
761 Value& operator*() const {
762 return *static_cast<Value*>(this->my_item);
765 Value* operator->() const {return &operator*();}
767 //! Advance to next item in queue
768 concurrent_queue_iterator& operator++() {
774 Value* operator++(int) {
775 Value* result = &operator*();
779 }; // concurrent_queue_iterator
782 template<typename C, typename T, typename U>
783 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
784 return i.my_item==j.my_item;
787 template<typename C, typename T, typename U>
788 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
789 return i.my_item!=j.my_item;
792 } // namespace internal
796 } // namespace strict_ppl
801 class concurrent_queue_rep;
802 class concurrent_queue_iterator_rep;
803 class concurrent_queue_iterator_base_v3;
804 template<typename Container, typename Value> class concurrent_queue_iterator;
806 //! For internal use only.
807 /** Type-independent portion of concurrent_queue.
808 @ingroup containers */
809 class concurrent_queue_base_v3: no_copy {
810 //! Internal representation
811 concurrent_queue_rep* my_rep;
813 friend class concurrent_queue_rep;
814 friend struct micro_queue;
815 friend class micro_queue_pop_finalizer;
816 friend class concurrent_queue_iterator_rep;
817 friend class concurrent_queue_iterator_base_v3;
825 //! Capacity of the queue
826 ptrdiff_t my_capacity;
828 //! Always a power of 2
829 size_t items_per_page;
834 #if __TBB_GCC_3_3_PROTECTED_BROKEN
838 struct padded_page: page {
839 //! Not defined anywhere - exists to quiet warnings.
841 //! Not defined anywhere - exists to quiet warnings.
842 void operator=( const padded_page& );
843 //! Must be last field.
848 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
849 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
851 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
852 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
854 //! Enqueue item at tail of queue
855 void __TBB_EXPORTED_METHOD internal_push( const void* src );
857 //! Dequeue item from head of queue
858 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
860 //! Attempt to enqueue item onto queue.
861 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
863 //! Attempt to dequeue item from queue.
864 /** NULL if there was no item to dequeue. */
865 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
867 //! Get size of queue
868 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
870 //! Check if the queue is emtpy
871 bool __TBB_EXPORTED_METHOD internal_empty() const;
873 //! Set the queue capacity
874 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
877 virtual page *allocate_page() = 0;
879 //! custom de-allocator
880 virtual void deallocate_page( page *p ) = 0;
882 //! free any remaining pages
883 /* note that the name may be misleading, but it remains so due to a historical accident. */
884 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
886 //! throw an exception
887 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
889 //! copy internal representation
890 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
893 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
896 //! Type-independent portion of concurrent_queue_iterator.
897 /** @ingroup containers */
898 class concurrent_queue_iterator_base_v3 {
899 //! concurrent_queue over which we are iterating.
900 /** NULL if one past last element in queue. */
901 concurrent_queue_iterator_rep* my_rep;
903 template<typename C, typename T, typename U>
904 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
906 template<typename C, typename T, typename U>
907 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
909 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
911 //! Pointer to current item
914 //! Default constructor
915 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
918 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
922 //! Obsolete entry point for constructing iterator pointing to head of queue.
923 /** Does not work correctly for SSE types. */
924 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
926 //! Construct iterator pointing to head of queue.
927 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
930 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
932 //! Advance iterator one step towards tail of queue.
933 void __TBB_EXPORTED_METHOD advance();
936 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
939 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
941 //! Meets requirements of a forward iterator for STL.
942 /** Value is either the T or const T type of the container.
943 @ingroup containers */
944 template<typename Container, typename Value>
945 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
946 public std::iterator<std::forward_iterator_tag,Value> {
948 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
949 template<typename T, class A>
950 friend class ::tbb::concurrent_bounded_queue;
952 template<typename T, class A>
953 friend class ::tbb::deprecated::concurrent_queue;
955 public: // workaround for MSVC
957 //! Construct iterator pointing to head of queue.
958 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
959 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
964 concurrent_queue_iterator() {}
966 /** If Value==Container::value_type, then this routine is the copy constructor.
967 If Value==const Container::value_type, then this routine is a conversion constructor. */
968 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
969 concurrent_queue_iterator_base_v3(other)
972 //! Iterator assignment
973 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
978 //! Reference to current item
979 Value& operator*() const {
980 return *static_cast<Value*>(my_item);
983 Value* operator->() const {return &operator*();}
985 //! Advance to next item in queue
986 concurrent_queue_iterator& operator++() {
992 Value* operator++(int) {
993 Value* result = &operator*();
997 }; // concurrent_queue_iterator
1000 template<typename C, typename T, typename U>
1001 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1002 return i.my_item==j.my_item;
1005 template<typename C, typename T, typename U>
1006 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1007 return i.my_item!=j.my_item;
1010 } // namespace internal;
1016 #endif /* __TBB_concurrent_queue_internal_H */