]> git.sesse.net Git - casparcg/blob - dependencies64/tbb/include/tbb/internal/_concurrent_queue_impl.h
4d6adbbd2945ca9a2a744d0310b74c3daf7913e1
[casparcg] / dependencies64 / tbb / include / tbb / internal / _concurrent_queue_impl.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB__concurrent_queue_impl_H
30 #define __TBB__concurrent_queue_impl_H
31
32 #ifndef __TBB_concurrent_queue_H
33 #error Do not #include this internal file directly; use public TBB headers instead.
34 #endif
35
36 #include "../tbb_stddef.h"
37 #include "../tbb_machine.h"
38 #include "../atomic.h"
39 #include "../spin_mutex.h"
40 #include "../cache_aligned_allocator.h"
41 #include "../tbb_exception.h"
42 #include "../tbb_profiling.h"
43 #include <new>
44
45 #if !TBB_USE_EXCEPTIONS && _MSC_VER
46     // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
47     #pragma warning (push)
48     #pragma warning (disable: 4530)
49 #endif
50
51 #include <iterator>
52
53 #if !TBB_USE_EXCEPTIONS && _MSC_VER
54     #pragma warning (pop)
55 #endif
56
57 namespace tbb {
58
59 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
60
61 // forward declaration
62 namespace strict_ppl {
63 template<typename T, typename A> class concurrent_queue;
64 }
65
66 template<typename T, typename A> class concurrent_bounded_queue;
67
68 namespace deprecated {
69 template<typename T, typename A> class concurrent_queue;
70 }
71 #endif
72
73 //! For internal use only.
74 namespace strict_ppl {
75
76 //! @cond INTERNAL
77 namespace internal {
78
79 using namespace tbb::internal;
80
81 typedef size_t ticket;
82
83 template<typename T> class micro_queue ;
84 template<typename T> class micro_queue_pop_finalizer ;
85 template<typename T> class concurrent_queue_base_v3;
86
87 //! parts of concurrent_queue_rep that do not have references to micro_queue
88 /**
89  * For internal use only.
90  */
91 struct concurrent_queue_rep_base : no_copy {
92     template<typename T> friend class micro_queue;
93     template<typename T> friend class concurrent_queue_base_v3;
94
95 protected:
96     //! Approximately n_queue/golden ratio
97     static const size_t phi = 3;
98
99 public:
100     // must be power of 2
101     static const size_t n_queue = 8;
102
103     //! Prefix on a page
104     struct page {
105         page* next;
106         uintptr_t mask; 
107     };
108
109     atomic<ticket> head_counter;
110     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
111     atomic<ticket> tail_counter;
112     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
113
114     //! Always a power of 2
115     size_t items_per_page;
116
117     //! Size of an item
118     size_t item_size;
119
120     //! number of invalid entries in the queue
121     atomic<size_t> n_invalid_entries;
122
123     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
124 } ;
125
126 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
127     return uintptr_t(p)>1;
128 }
129
130 //! Abstract class to define interface for page allocation/deallocation
131 /**
132  * For internal use only.
133  */
134 class concurrent_queue_page_allocator
135 {
136     template<typename T> friend class micro_queue ;
137     template<typename T> friend class micro_queue_pop_finalizer ;
138 protected:
139     virtual ~concurrent_queue_page_allocator() {}
140 private:
141     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
142     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
143 } ;
144
145 #if _MSC_VER && !defined(__INTEL_COMPILER)
146 // unary minus operator applied to unsigned type, result still unsigned
147 #pragma warning( push )
148 #pragma warning( disable: 4146 )
149 #endif
150
151 //! A queue using simple locking.
152 /** For efficiency, this class has no constructor.
153     The caller is expected to zero-initialize it. */
154 template<typename T>
155 class micro_queue : no_copy {
156     typedef concurrent_queue_rep_base::page page;
157
158     //! Class used to ensure exception-safety of method "pop" 
159     class destroyer: no_copy {
160         T& my_value;
161     public:
162         destroyer( T& value ) : my_value(value) {}
163         ~destroyer() {my_value.~T();}          
164     };
165
166     void copy_item( page& dst, size_t index, const void* src ) {
167         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
168     }
169
170     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
171         new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
172     }
173
174     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
175         T& from = get_ref(src,index);
176         destroyer d(from);
177         *static_cast<T*>(dst) = from;
178     }
179
180     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
181
182 public:
183     friend class micro_queue_pop_finalizer<T>;
184
185     struct padded_page: page {
186         //! Not defined anywhere - exists to quiet warnings.
187         padded_page(); 
188         //! Not defined anywhere - exists to quiet warnings.
189         void operator=( const padded_page& );
190         //! Must be last field.
191         T last;
192     };
193
194     static T& get_ref( page& p, size_t index ) {
195         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
196     }
197
198     atomic<page*> head_page;
199     atomic<ticket> head_counter;
200
201     atomic<page*> tail_page;
202     atomic<ticket> tail_counter;
203
204     spin_mutex page_mutex;
205     
206     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
207
208     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
209
210     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
211
212     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
213
214     void invalidate_page_and_rethrow( ticket k ) ;
215 };
216
217 template<typename T>
218 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
219     atomic_backoff backoff;
220     do {
221         backoff.pause();
222         if( counter&1 ) {
223             ++rb.n_invalid_entries;
224             throw_exception( eid_bad_last_alloc );
225         }
226     } while( counter!=k ) ;
227 }
228
229 template<typename T>
230 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
231     k &= -concurrent_queue_rep_base::n_queue;
232     page* p = NULL;
233     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
234     if( !index ) {
235         __TBB_TRY {
236             concurrent_queue_page_allocator& pa = base;
237             p = pa.allocate_page();
238         } __TBB_CATCH (...) {
239             ++base.my_rep->n_invalid_entries;
240             invalidate_page_and_rethrow( k );
241         }
242         p->mask = 0;
243         p->next = NULL;
244     }
245
246     if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
247     call_itt_notify(acquired, &tail_counter);
248         
249     if( p ) {
250         spin_mutex::scoped_lock lock( page_mutex );
251         page* q = tail_page;
252         if( is_valid_page(q) )
253             q->next = p;
254         else
255             head_page = p; 
256         tail_page = p;
257     } else {
258         p = tail_page;
259     }
260     __TBB_TRY {
261         copy_item( *p, index, item );
262         // If no exception was thrown, mark item as present.
263         itt_hide_store_word(p->mask,  p->mask | uintptr_t(1)<<index);
264         call_itt_notify(releasing, &tail_counter);
265         tail_counter += concurrent_queue_rep_base::n_queue; 
266     } __TBB_CATCH (...) {
267         ++base.my_rep->n_invalid_entries;
268         call_itt_notify(releasing, &tail_counter);
269         tail_counter += concurrent_queue_rep_base::n_queue; 
270         __TBB_RETHROW();
271     }
272 }
273
274 template<typename T>
275 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
276     k &= -concurrent_queue_rep_base::n_queue;
277     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
278     call_itt_notify(acquired, &head_counter);
279     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
280     call_itt_notify(acquired, &tail_counter);
281     page& p = *head_page;
282     __TBB_ASSERT( &p, NULL );
283     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
284     bool success = false; 
285     {
286         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 
287         if( p.mask & uintptr_t(1)<<index ) {
288             success = true;
289             assign_and_destroy_item( dst, p, index );
290         } else {
291             --base.my_rep->n_invalid_entries;
292         }
293     }
294     return success;
295 }
296
297 template<typename T>
298 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
299     head_counter = src.head_counter;
300     tail_counter = src.tail_counter;
301     page_mutex   = src.page_mutex;
302
303     const page* srcp = src.head_page;
304     if( is_valid_page(srcp) ) {
305         ticket g_index = head_counter;
306         __TBB_TRY {
307             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
308             size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
309             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
310
311             head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
312             page* cur_page = head_page;
313
314             if( srcp != src.tail_page ) {
315                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
316                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
317                     cur_page = cur_page->next;
318                 }
319
320                 __TBB_ASSERT( srcp==src.tail_page, NULL );
321                 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
322                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
323
324                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
325                 cur_page = cur_page->next;
326             }
327             tail_page = cur_page;
328         } __TBB_CATCH (...) {
329             invalidate_page_and_rethrow( g_index );
330         }
331     } else {
332         head_page = tail_page = NULL;
333     }
334     return *this;
335 }
336
337 template<typename T>
338 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
339     // Append an invalid page at address 1 so that no more pushes are allowed.
340     page* invalid_page = (page*)uintptr_t(1);
341     {
342         spin_mutex::scoped_lock lock( page_mutex );
343         itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
344         page* q = tail_page;
345         if( is_valid_page(q) )
346             q->next = invalid_page;
347         else
348             head_page = invalid_page;
349         tail_page = invalid_page;
350     }
351     __TBB_RETHROW();
352 }
353
354 template<typename T>
355 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
356     concurrent_queue_page_allocator& pa = base;
357     page* new_page = pa.allocate_page();
358     new_page->next = NULL;
359     new_page->mask = src_page->mask;
360     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
361         if( new_page->mask & uintptr_t(1)<<begin_in_page )
362             copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
363     return new_page;
364 }
365
366 template<typename T>
367 class micro_queue_pop_finalizer: no_copy {
368     typedef concurrent_queue_rep_base::page page;
369     ticket my_ticket;
370     micro_queue<T>& my_queue;
371     page* my_page; 
372     concurrent_queue_page_allocator& allocator;
373 public:
374     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
375         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
376     {}
377     ~micro_queue_pop_finalizer() ;
378 };
379
380 template<typename T>
381 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
382     page* p = my_page;
383     if( is_valid_page(p) ) {
384         spin_mutex::scoped_lock lock( my_queue.page_mutex );
385         page* q = p->next;
386         my_queue.head_page = q;
387         if( !is_valid_page(q) ) {
388             my_queue.tail_page = NULL;
389         }
390     }
391     itt_store_word_with_release(my_queue.head_counter, my_ticket);
392     if( is_valid_page(p) ) {
393         allocator.deallocate_page( p );
394     }
395 }
396
397 #if _MSC_VER && !defined(__INTEL_COMPILER)
398 #pragma warning( pop )
399 #endif // warning 4146 is back
400
401 template<typename T> class concurrent_queue_iterator_rep ;
402 template<typename T> class concurrent_queue_iterator_base_v3;
403
404 //! representation of concurrent_queue_base
405 /**
406  * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
407  */
408 template<typename T>
409 struct concurrent_queue_rep : public concurrent_queue_rep_base {
410     micro_queue<T> array[n_queue];
411
412     //! Map ticket to an array index
413     static size_t index( ticket k ) {
414         return k*phi%n_queue;
415     }
416
417     micro_queue<T>& choose( ticket k ) {
418         // The formula here approximates LRU in a cache-oblivious way.
419         return array[index(k)];
420     }
421 };
422
423 //! base class of concurrent_queue
424 /**
425  * The class implements the interface defined by concurrent_queue_page_allocator
426  * and has a pointer to an instance of concurrent_queue_rep.
427  */
428 template<typename T>
429 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
430     //! Internal representation
431     concurrent_queue_rep<T>* my_rep;
432
433     friend struct concurrent_queue_rep<T>;
434     friend class micro_queue<T>;
435     friend class concurrent_queue_iterator_rep<T>;
436     friend class concurrent_queue_iterator_base_v3<T>;
437
438 protected:
439     typedef typename concurrent_queue_rep<T>::page page;
440
441 private:
442     typedef typename micro_queue<T>::padded_page padded_page;
443
444     /* override */ virtual page *allocate_page() {
445         concurrent_queue_rep<T>& r = *my_rep;
446         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
447         return reinterpret_cast<page*>(allocate_block ( n ));
448     }
449
450     /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
451         concurrent_queue_rep<T>& r = *my_rep;
452         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
453         deallocate_block( reinterpret_cast<void*>(p), n );
454     }
455
456     //! custom allocator
457     virtual void *allocate_block( size_t n ) = 0;
458
459     //! custom de-allocator
460     virtual void deallocate_block( void *p, size_t n ) = 0;
461
462 protected:
463     concurrent_queue_base_v3();
464
465     /* override */ virtual ~concurrent_queue_base_v3() {
466 #if TBB_USE_ASSERT
467         size_t nq = my_rep->n_queue;
468         for( size_t i=0; i<nq; i++ )
469             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
470 #endif /* TBB_USE_ASSERT */
471         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
472     }
473
474     //! Enqueue item at tail of queue
475     void internal_push( const void* src ) {
476         concurrent_queue_rep<T>& r = *my_rep;
477         ticket k = r.tail_counter++;
478         r.choose(k).push( src, k, *this );
479     }
480
481     //! Attempt to dequeue item from queue.
482     /** NULL if there was no item to dequeue. */
483     bool internal_try_pop( void* dst ) ;
484
485     //! Get size of queue; result may be invalid if queue is modified concurrently
486     size_t internal_size() const ;
487
488     //! check if the queue is empty; thread safe
489     bool internal_empty() const ;
490
491     //! free any remaining pages
492     /* note that the name may be misleading, but it remains so due to a historical accident. */
493     void internal_finish_clear() ;
494
495     //! Obsolete
496     void internal_throw_exception() const {
497         throw_exception( eid_bad_alloc );
498     }
499
500     //! copy internal representation
501     void assign( const concurrent_queue_base_v3& src ) ;
502 };
503
504 template<typename T>
505 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
506     const size_t item_size = sizeof(T);
507     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
508     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
509     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
510     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
511     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
512     memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
513     my_rep->item_size = item_size;
514     my_rep->items_per_page = item_size<=8 ? 32 :
515                              item_size<=16 ? 16 : 
516                              item_size<=32 ? 8 :
517                              item_size<=64 ? 4 :
518                              item_size<=128 ? 2 :
519                              1;
520 }
521
522 template<typename T>
523 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
524     concurrent_queue_rep<T>& r = *my_rep;
525     ticket k;
526     do {
527         k = r.head_counter;
528         for(;;) {
529             if( r.tail_counter<=k ) {
530                 // Queue is empty 
531                 return false;
532             }
533             // Queue had item with ticket k when we looked.  Attempt to get that item.
534             ticket tk=k;
535 #if defined(_MSC_VER) && defined(_Wp64)
536     #pragma warning (push)
537     #pragma warning (disable: 4267)
538 #endif
539             k = r.head_counter.compare_and_swap( tk+1, tk );
540 #if defined(_MSC_VER) && defined(_Wp64)
541     #pragma warning (pop)
542 #endif
543             if( k==tk )
544                 break;
545             // Another thread snatched the item, retry.
546         }
547     } while( !r.choose( k ).pop( dst, k, *this ) );
548     return true;
549 }
550
551 template<typename T>
552 size_t concurrent_queue_base_v3<T>::internal_size() const {
553     concurrent_queue_rep<T>& r = *my_rep;
554     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
555     ticket hc = r.head_counter;
556     size_t nie = r.n_invalid_entries;
557     ticket tc = r.tail_counter;
558     __TBB_ASSERT( hc!=tc || !nie, NULL );
559     ptrdiff_t sz = tc-hc-nie;
560     return sz<0 ? 0 :  size_t(sz);
561 }
562
563 template<typename T>
564 bool concurrent_queue_base_v3<T>::internal_empty() const {
565     concurrent_queue_rep<T>& r = *my_rep;
566     ticket tc = r.tail_counter;
567     ticket hc = r.head_counter;
568     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
569     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
570 }
571
572 template<typename T>
573 void concurrent_queue_base_v3<T>::internal_finish_clear() {
574     concurrent_queue_rep<T>& r = *my_rep;
575     size_t nq = r.n_queue;
576     for( size_t i=0; i<nq; ++i ) {
577         page* tp = r.array[i].tail_page;
578         if( is_valid_page(tp) ) {
579             __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
580             deallocate_page( tp );
581             r.array[i].tail_page = NULL;
582         } else 
583             __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
584     }
585 }
586
587 template<typename T>
588 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
589     concurrent_queue_rep<T>& r = *my_rep;
590     r.items_per_page = src.my_rep->items_per_page;
591
592     // copy concurrent_queue_rep.
593     r.head_counter = src.my_rep->head_counter;
594     r.tail_counter = src.my_rep->tail_counter;
595     r.n_invalid_entries = src.my_rep->n_invalid_entries;
596
597     // copy micro_queues
598     for( size_t i = 0; i<r.n_queue; ++i )
599         r.array[i].assign( src.my_rep->array[i], *this);
600
601     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 
602             "the source concurrent queue should not be concurrently modified." );
603 }
604
605 template<typename Container, typename Value> class concurrent_queue_iterator;
606
607 template<typename T>
608 class concurrent_queue_iterator_rep: no_assign {
609     typedef typename micro_queue<T>::padded_page padded_page;
610 public:
611     ticket head_counter;
612     const concurrent_queue_base_v3<T>& my_queue;
613     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
614     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
615         head_counter(queue.my_rep->head_counter),
616         my_queue(queue)
617     {
618         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
619             array[k] = queue.my_rep->array[k].head_page;
620     }
621
622     //! Set item to point to kth element.  Return true if at end of queue or item is marked valid; false otherwise.
623     bool get_item( T*& item, size_t k ) ;
624 };
625
626 template<typename T>
627 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
628     if( k==my_queue.my_rep->tail_counter ) {
629         item = NULL;
630         return true;
631     } else {
632         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
633         __TBB_ASSERT(p,NULL);
634         size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
635         item = &micro_queue<T>::get_ref(*p,i);
636         return (p->mask & uintptr_t(1)<<i)!=0;
637     }
638 }
639
640 //! Constness-independent portion of concurrent_queue_iterator.
641 /** @ingroup containers */
642 template<typename Value>
643 class concurrent_queue_iterator_base_v3 : no_assign {
644     //! Represents concurrent_queue over which we are iterating.
645     /** NULL if one past last element in queue. */
646     concurrent_queue_iterator_rep<Value>* my_rep;
647
648     template<typename C, typename T, typename U>
649     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
650
651     template<typename C, typename T, typename U>
652     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
653 protected:
654     //! Pointer to current item
655     Value* my_item;
656
657     //! Default constructor
658     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
659 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
660         __TBB_compiler_fence();
661 #endif
662     }
663
664     //! Copy constructor
665     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
666     : no_assign(), my_rep(NULL), my_item(NULL) {
667         assign(i);
668     }
669
670     //! Construct iterator pointing to head of queue.
671     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
672
673     //! Assignment
674     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
675
676     //! Advance iterator one step towards tail of queue.
677     void advance() ;
678
679     //! Destructor
680     ~concurrent_queue_iterator_base_v3() {
681         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
682         my_rep = NULL;
683     }
684 };
685
686 template<typename Value>
687 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
688     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
689     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
690     size_t k = my_rep->head_counter;
691     if( !my_rep->get_item(my_item, k) ) advance();
692 }
693
694 template<typename Value>
695 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
696     if( my_rep!=other.my_rep ) {
697         if( my_rep ) {
698             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
699             my_rep = NULL;
700         }
701         if( other.my_rep ) {
702             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
703             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
704         }
705     }
706     my_item = other.my_item;
707 }
708
709 template<typename Value>
710 void concurrent_queue_iterator_base_v3<Value>::advance() {
711     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );  
712     size_t k = my_rep->head_counter;
713     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
714 #if TBB_USE_ASSERT
715     Value* tmp;
716     my_rep->get_item(tmp,k);
717     __TBB_ASSERT( my_item==tmp, NULL );
718 #endif /* TBB_USE_ASSERT */
719     size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
720     if( i==queue.my_rep->items_per_page-1 ) {
721         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
722         root = root->next;
723     }
724     // advance k
725     my_rep->head_counter = ++k;
726     if( !my_rep->get_item(my_item, k) ) advance();
727 }
728
729 //! Similar to C++0x std::remove_cv
730 /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
731 template<typename T> struct tbb_remove_cv {typedef T type;};
732 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
733 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
734 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
735
736 //! Meets requirements of a forward iterator for STL.
737 /** Value is either the T or const T type of the container.
738     @ingroup containers */
739 template<typename Container, typename Value>
740 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
741         public std::iterator<std::forward_iterator_tag,Value> {
742 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
743     template<typename T, class A>
744     friend class ::tbb::strict_ppl::concurrent_queue;
745 #else
746 public: // workaround for MSVC
747 #endif 
748     //! Construct iterator pointing to head of queue.
749     concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
750         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
751     {
752     }
753
754 public:
755     concurrent_queue_iterator() {}
756
757     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
758         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
759     {}
760
761     //! Iterator assignment
762     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
763         this->assign(other);
764         return *this;
765     }
766
767     //! Reference to current item 
768     Value& operator*() const {
769         return *static_cast<Value*>(this->my_item);
770     }
771
772     Value* operator->() const {return &operator*();}
773
774     //! Advance to next item in queue
775     concurrent_queue_iterator& operator++() {
776         this->advance();
777         return *this;
778     }
779
780     //! Post increment
781     Value* operator++(int) {
782         Value* result = &operator*();
783         operator++();
784         return result;
785     }
786 }; // concurrent_queue_iterator
787
788
789 template<typename C, typename T, typename U>
790 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
791     return i.my_item==j.my_item;
792 }
793
794 template<typename C, typename T, typename U>
795 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
796     return i.my_item!=j.my_item;
797 }
798
799 } // namespace internal
800
801 //! @endcond
802
803 } // namespace strict_ppl
804
805 //! @cond INTERNAL
806 namespace internal {
807
808 class concurrent_queue_rep;
809 class concurrent_queue_iterator_rep;
810 class concurrent_queue_iterator_base_v3;
811 template<typename Container, typename Value> class concurrent_queue_iterator;
812
813 //! For internal use only.
814 /** Type-independent portion of concurrent_queue.
815     @ingroup containers */
816 class concurrent_queue_base_v3: no_copy {
817     //! Internal representation
818     concurrent_queue_rep* my_rep;
819
820     friend class concurrent_queue_rep;
821     friend struct micro_queue;
822     friend class micro_queue_pop_finalizer;
823     friend class concurrent_queue_iterator_rep;
824     friend class concurrent_queue_iterator_base_v3;
825 protected:
826     //! Prefix on a page
827     struct page {
828         page* next;
829         uintptr_t mask; 
830     };
831
832     //! Capacity of the queue
833     ptrdiff_t my_capacity;
834    
835     //! Always a power of 2
836     size_t items_per_page;
837
838     //! Size of an item
839     size_t item_size;
840
841 #if __TBB_GCC_3_3_PROTECTED_BROKEN
842 public:
843 #endif
844     template<typename T>
845     struct padded_page: page {
846         //! Not defined anywhere - exists to quiet warnings.
847         padded_page(); 
848         //! Not defined anywhere - exists to quiet warnings.
849         void operator=( const padded_page& );
850         //! Must be last field.
851         T last;
852     };
853
854 private:
855     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
856     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
857 protected:
858     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
859     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
860
861     //! Enqueue item at tail of queue
862     void __TBB_EXPORTED_METHOD internal_push( const void* src );
863
864     //! Dequeue item from head of queue
865     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
866
867     //! Attempt to enqueue item onto queue.
868     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
869
870     //! Attempt to dequeue item from queue.
871     /** NULL if there was no item to dequeue. */
872     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
873
874     //! Get size of queue
875     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
876
877     //! Check if the queue is emtpy
878     bool __TBB_EXPORTED_METHOD internal_empty() const;
879
880     //! Set the queue capacity
881     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
882
883     //! custom allocator
884     virtual page *allocate_page() = 0;
885
886     //! custom de-allocator
887     virtual void deallocate_page( page *p ) = 0;
888
889     //! free any remaining pages
890     /* note that the name may be misleading, but it remains so due to a historical accident. */
891     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
892
893     //! throw an exception
894     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
895
896     //! copy internal representation
897     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
898
899 private:
900     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
901 };
902
903 //! Type-independent portion of concurrent_queue_iterator.
904 /** @ingroup containers */
905 class concurrent_queue_iterator_base_v3 {
906     //! concurrent_queue over which we are iterating.
907     /** NULL if one past last element in queue. */
908     concurrent_queue_iterator_rep* my_rep;
909
910     template<typename C, typename T, typename U>
911     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
912
913     template<typename C, typename T, typename U>
914     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
915
916     void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
917 protected:
918     //! Pointer to current item
919     void* my_item;
920
921     //! Default constructor
922     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
923
924     //! Copy constructor
925     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
926         assign(i);
927     }
928
929     //! Obsolete entry point for constructing iterator pointing to head of queue.
930     /** Does not work correctly for SSE types. */
931     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
932
933     //! Construct iterator pointing to head of queue.
934     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
935
936     //! Assignment
937     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
938
939     //! Advance iterator one step towards tail of queue.
940     void __TBB_EXPORTED_METHOD advance();
941
942     //! Destructor
943     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
944 };
945
946 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
947
948 //! Meets requirements of a forward iterator for STL.
949 /** Value is either the T or const T type of the container.
950     @ingroup containers */
951 template<typename Container, typename Value>
952 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
953         public std::iterator<std::forward_iterator_tag,Value> {
954
955 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
956     template<typename T, class A>
957     friend class ::tbb::concurrent_bounded_queue;
958
959     template<typename T, class A>
960     friend class ::tbb::deprecated::concurrent_queue;
961 #else
962 public: // workaround for MSVC
963 #endif 
964     //! Construct iterator pointing to head of queue.
965     concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
966         concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
967     {
968     }
969
970 public:
971     concurrent_queue_iterator() {}
972
973     /** If Value==Container::value_type, then this routine is the copy constructor. 
974         If Value==const Container::value_type, then this routine is a conversion constructor. */
975     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
976         concurrent_queue_iterator_base_v3(other)
977     {}
978
979     //! Iterator assignment
980     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
981         assign(other);
982         return *this;
983     }
984
985     //! Reference to current item 
986     Value& operator*() const {
987         return *static_cast<Value*>(my_item);
988     }
989
990     Value* operator->() const {return &operator*();}
991
992     //! Advance to next item in queue
993     concurrent_queue_iterator& operator++() {
994         advance();
995         return *this;
996     }
997
998     //! Post increment
999     Value* operator++(int) {
1000         Value* result = &operator*();
1001         operator++();
1002         return result;
1003     }
1004 }; // concurrent_queue_iterator
1005
1006
1007 template<typename C, typename T, typename U>
1008 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1009     return i.my_item==j.my_item;
1010 }
1011
1012 template<typename C, typename T, typename U>
1013 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1014     return i.my_item!=j.my_item;
1015 }
1016
1017 } // namespace internal;
1018
1019 //! @endcond
1020
1021 } // namespace tbb
1022
1023 #endif /* __TBB__concurrent_queue_impl_H */