]> git.sesse.net Git - casparcg/blob - tbb/include/tbb/internal/_concurrent_queue_impl.h
2.0. Updated tbb library.
[casparcg] / tbb / include / tbb / internal / _concurrent_queue_impl.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_concurrent_queue_internal_H
30 #define __TBB_concurrent_queue_internal_H
31
32 #include "../tbb_stddef.h"
33 #include "../tbb_machine.h"
34 #include "../atomic.h"
35 #include "../spin_mutex.h"
36 #include "../cache_aligned_allocator.h"
37 #include "../tbb_exception.h"
38 #include "../tbb_profiling.h"
39 #include <new>
40
41 #if !TBB_USE_EXCEPTIONS && _MSC_VER
42     // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
43     #pragma warning (push)
44     #pragma warning (disable: 4530)
45 #endif
46
47 #include <iterator>
48
49 #if !TBB_USE_EXCEPTIONS && _MSC_VER
50     #pragma warning (pop)
51 #endif
52
53 namespace tbb {
54
55 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
56
57 // forward declaration
58 namespace strict_ppl {
59 template<typename T, typename A> class concurrent_queue;
60 }
61
62 template<typename T, typename A> class concurrent_bounded_queue;
63
64 namespace deprecated {
65 template<typename T, typename A> class concurrent_queue;
66 }
67 #endif
68
69 //! For internal use only.
70 namespace strict_ppl {
71
72 //! @cond INTERNAL
73 namespace internal {
74
75 using namespace tbb::internal;
76
77 typedef size_t ticket;
78
79 template<typename T> class micro_queue ;
80 template<typename T> class micro_queue_pop_finalizer ;
81 template<typename T> class concurrent_queue_base_v3;
82
83 //! parts of concurrent_queue_rep that do not have references to micro_queue
84 /**
85  * For internal use only.
86  */
87 struct concurrent_queue_rep_base : no_copy {
88     template<typename T> friend class micro_queue;
89     template<typename T> friend class concurrent_queue_base_v3;
90
91 protected:
92     //! Approximately n_queue/golden ratio
93     static const size_t phi = 3;
94
95 public:
96     // must be power of 2
97     static const size_t n_queue = 8;
98
99     //! Prefix on a page
100     struct page {
101         page* next;
102         uintptr_t mask; 
103     };
104
105     atomic<ticket> head_counter;
106     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
107     atomic<ticket> tail_counter;
108     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
109
110     //! Always a power of 2
111     size_t items_per_page;
112
113     //! Size of an item
114     size_t item_size;
115
116     //! number of invalid entries in the queue
117     atomic<size_t> n_invalid_entries;
118
119     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
120 } ;
121
122 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
123     return uintptr_t(p)>1;
124 }
125
126 //! Abstract class to define interface for page allocation/deallocation
127 /**
128  * For internal use only.
129  */
130 class concurrent_queue_page_allocator
131 {
132     template<typename T> friend class micro_queue ;
133     template<typename T> friend class micro_queue_pop_finalizer ;
134 protected:
135     virtual ~concurrent_queue_page_allocator() {}
136 private:
137     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
138     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
139 } ;
140
141 #if _MSC_VER && !defined(__INTEL_COMPILER)
142 // unary minus operator applied to unsigned type, result still unsigned
143 #pragma warning( push )
144 #pragma warning( disable: 4146 )
145 #endif
146
147 //! A queue using simple locking.
148 /** For efficiency, this class has no constructor.
149     The caller is expected to zero-initialize it. */
150 template<typename T>
151 class micro_queue : no_copy {
152     typedef concurrent_queue_rep_base::page page;
153
154     //! Class used to ensure exception-safety of method "pop" 
155     class destroyer: no_copy {
156         T& my_value;
157     public:
158         destroyer( T& value ) : my_value(value) {}
159         ~destroyer() {my_value.~T();}          
160     };
161
162     void copy_item( page& dst, size_t index, const void* src ) {
163         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
164     }
165
166     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
167         new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
168     }
169
170     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
171         T& from = get_ref(src,index);
172         destroyer d(from);
173         *static_cast<T*>(dst) = from;
174     }
175
176     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
177
178 public:
179     friend class micro_queue_pop_finalizer<T>;
180
181     struct padded_page: page {
182         //! Not defined anywhere - exists to quiet warnings.
183         padded_page(); 
184         //! Not defined anywhere - exists to quiet warnings.
185         void operator=( const padded_page& );
186         //! Must be last field.
187         T last;
188     };
189
190     static T& get_ref( page& p, size_t index ) {
191         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
192     }
193
194     atomic<page*> head_page;
195     atomic<ticket> head_counter;
196
197     atomic<page*> tail_page;
198     atomic<ticket> tail_counter;
199
200     spin_mutex page_mutex;
201     
202     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
203
204     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
205
206     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
207
208     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
209
210     void invalidate_page_and_rethrow( ticket k ) ;
211 };
212
213 template<typename T>
214 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
215     atomic_backoff backoff;
216     do {
217         backoff.pause();
218         if( counter&1 ) {
219             ++rb.n_invalid_entries;
220             throw_exception( eid_bad_last_alloc );
221         }
222     } while( counter!=k ) ;
223 }
224
225 template<typename T>
226 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
227     k &= -concurrent_queue_rep_base::n_queue;
228     page* p = NULL;
229     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
230     if( !index ) {
231         __TBB_TRY {
232             concurrent_queue_page_allocator& pa = base;
233             p = pa.allocate_page();
234         } __TBB_CATCH (...) {
235             ++base.my_rep->n_invalid_entries;
236             invalidate_page_and_rethrow( k );
237         }
238         p->mask = 0;
239         p->next = NULL;
240     }
241
242     if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
243     call_itt_notify(acquired, &tail_counter);
244         
245     if( p ) {
246         spin_mutex::scoped_lock lock( page_mutex );
247         page* q = tail_page;
248         if( is_valid_page(q) )
249             q->next = p;
250         else
251             head_page = p; 
252         tail_page = p;
253     } else {
254         p = tail_page;
255     }
256     __TBB_TRY {
257         copy_item( *p, index, item );
258         // If no exception was thrown, mark item as present.
259         itt_hide_store_word(p->mask,  p->mask | uintptr_t(1)<<index);
260         call_itt_notify(releasing, &tail_counter);
261         tail_counter += concurrent_queue_rep_base::n_queue; 
262     } __TBB_CATCH (...) {
263         ++base.my_rep->n_invalid_entries;
264         call_itt_notify(releasing, &tail_counter);
265         tail_counter += concurrent_queue_rep_base::n_queue; 
266         __TBB_RETHROW();
267     }
268 }
269
270 template<typename T>
271 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
272     k &= -concurrent_queue_rep_base::n_queue;
273     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
274     call_itt_notify(acquired, &head_counter);
275     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
276     call_itt_notify(acquired, &tail_counter);
277     page& p = *head_page;
278     __TBB_ASSERT( &p, NULL );
279     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
280     bool success = false; 
281     {
282         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 
283         if( p.mask & uintptr_t(1)<<index ) {
284             success = true;
285             assign_and_destroy_item( dst, p, index );
286         } else {
287             --base.my_rep->n_invalid_entries;
288         }
289     }
290     return success;
291 }
292
293 template<typename T>
294 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
295     head_counter = src.head_counter;
296     tail_counter = src.tail_counter;
297     page_mutex   = src.page_mutex;
298
299     const page* srcp = src.head_page;
300     if( is_valid_page(srcp) ) {
301         ticket g_index = head_counter;
302         __TBB_TRY {
303             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
304             size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
305             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
306
307             head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
308             page* cur_page = head_page;
309
310             if( srcp != src.tail_page ) {
311                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
312                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
313                     cur_page = cur_page->next;
314                 }
315
316                 __TBB_ASSERT( srcp==src.tail_page, NULL );
317                 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
318                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
319
320                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
321                 cur_page = cur_page->next;
322             }
323             tail_page = cur_page;
324         } __TBB_CATCH (...) {
325             invalidate_page_and_rethrow( g_index );
326         }
327     } else {
328         head_page = tail_page = NULL;
329     }
330     return *this;
331 }
332
333 template<typename T>
334 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
335     // Append an invalid page at address 1 so that no more pushes are allowed.
336     page* invalid_page = (page*)uintptr_t(1);
337     {
338         spin_mutex::scoped_lock lock( page_mutex );
339         itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
340         page* q = tail_page;
341         if( is_valid_page(q) )
342             q->next = invalid_page;
343         else
344             head_page = invalid_page;
345         tail_page = invalid_page;
346     }
347     __TBB_RETHROW();
348 }
349
350 template<typename T>
351 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
352     concurrent_queue_page_allocator& pa = base;
353     page* new_page = pa.allocate_page();
354     new_page->next = NULL;
355     new_page->mask = src_page->mask;
356     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
357         if( new_page->mask & uintptr_t(1)<<begin_in_page )
358             copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
359     return new_page;
360 }
361
362 template<typename T>
363 class micro_queue_pop_finalizer: no_copy {
364     typedef concurrent_queue_rep_base::page page;
365     ticket my_ticket;
366     micro_queue<T>& my_queue;
367     page* my_page; 
368     concurrent_queue_page_allocator& allocator;
369 public:
370     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
371         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
372     {}
373     ~micro_queue_pop_finalizer() ;
374 };
375
376 template<typename T>
377 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
378     page* p = my_page;
379     if( is_valid_page(p) ) {
380         spin_mutex::scoped_lock lock( my_queue.page_mutex );
381         page* q = p->next;
382         my_queue.head_page = q;
383         if( !is_valid_page(q) ) {
384             my_queue.tail_page = NULL;
385         }
386     }
387     itt_store_word_with_release(my_queue.head_counter, my_ticket);
388     if( is_valid_page(p) ) {
389         allocator.deallocate_page( p );
390     }
391 }
392
393 #if _MSC_VER && !defined(__INTEL_COMPILER)
394 #pragma warning( pop )
395 #endif // warning 4146 is back
396
397 template<typename T> class concurrent_queue_iterator_rep ;
398 template<typename T> class concurrent_queue_iterator_base_v3;
399
400 //! representation of concurrent_queue_base
401 /**
402  * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
403  */
404 template<typename T>
405 struct concurrent_queue_rep : public concurrent_queue_rep_base {
406     micro_queue<T> array[n_queue];
407
408     //! Map ticket to an array index
409     static size_t index( ticket k ) {
410         return k*phi%n_queue;
411     }
412
413     micro_queue<T>& choose( ticket k ) {
414         // The formula here approximates LRU in a cache-oblivious way.
415         return array[index(k)];
416     }
417 };
418
419 //! base class of concurrent_queue
420 /**
421  * The class implements the interface defined by concurrent_queue_page_allocator
422  * and has a pointer to an instance of concurrent_queue_rep.
423  */
424 template<typename T>
425 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
426     //! Internal representation
427     concurrent_queue_rep<T>* my_rep;
428
429     friend struct concurrent_queue_rep<T>;
430     friend class micro_queue<T>;
431     friend class concurrent_queue_iterator_rep<T>;
432     friend class concurrent_queue_iterator_base_v3<T>;
433
434 protected:
435     typedef typename concurrent_queue_rep<T>::page page;
436
437 private:
438     typedef typename micro_queue<T>::padded_page padded_page;
439
440     /* override */ virtual page *allocate_page() {
441         concurrent_queue_rep<T>& r = *my_rep;
442         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
443         return reinterpret_cast<page*>(allocate_block ( n ));
444     }
445
446     /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
447         concurrent_queue_rep<T>& r = *my_rep;
448         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
449         deallocate_block( reinterpret_cast<void*>(p), n );
450     }
451
452     //! custom allocator
453     virtual void *allocate_block( size_t n ) = 0;
454
455     //! custom de-allocator
456     virtual void deallocate_block( void *p, size_t n ) = 0;
457
458 protected:
459     concurrent_queue_base_v3();
460
461     /* override */ virtual ~concurrent_queue_base_v3() {
462 #if TBB_USE_ASSERT
463         size_t nq = my_rep->n_queue;
464         for( size_t i=0; i<nq; i++ )
465             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
466 #endif /* TBB_USE_ASSERT */
467         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
468     }
469
470     //! Enqueue item at tail of queue
471     void internal_push( const void* src ) {
472         concurrent_queue_rep<T>& r = *my_rep;
473         ticket k = r.tail_counter++;
474         r.choose(k).push( src, k, *this );
475     }
476
477     //! Attempt to dequeue item from queue.
478     /** NULL if there was no item to dequeue. */
479     bool internal_try_pop( void* dst ) ;
480
481     //! Get size of queue; result may be invalid if queue is modified concurrently
482     size_t internal_size() const ;
483
484     //! check if the queue is empty; thread safe
485     bool internal_empty() const ;
486
487     //! free any remaining pages
488     /* note that the name may be misleading, but it remains so due to a historical accident. */
489     void internal_finish_clear() ;
490
491     //! Obsolete
492     void internal_throw_exception() const {
493         throw_exception( eid_bad_alloc );
494     }
495
496     //! copy internal representation
497     void assign( const concurrent_queue_base_v3& src ) ;
498 };
499
500 template<typename T>
501 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
502     const size_t item_size = sizeof(T);
503     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
504     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
505     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
506     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
507     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
508     memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
509     my_rep->item_size = item_size;
510     my_rep->items_per_page = item_size<=8 ? 32 :
511                              item_size<=16 ? 16 : 
512                              item_size<=32 ? 8 :
513                              item_size<=64 ? 4 :
514                              item_size<=128 ? 2 :
515                              1;
516 }
517
518 template<typename T>
519 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
520     concurrent_queue_rep<T>& r = *my_rep;
521     ticket k;
522     do {
523         k = r.head_counter;
524         for(;;) {
525             if( r.tail_counter<=k ) {
526                 // Queue is empty 
527                 return false;
528             }
529             // Queue had item with ticket k when we looked.  Attempt to get that item.
530             ticket tk=k;
531 #if defined(_MSC_VER) && defined(_Wp64)
532     #pragma warning (push)
533     #pragma warning (disable: 4267)
534 #endif
535             k = r.head_counter.compare_and_swap( tk+1, tk );
536 #if defined(_MSC_VER) && defined(_Wp64)
537     #pragma warning (pop)
538 #endif
539             if( k==tk )
540                 break;
541             // Another thread snatched the item, retry.
542         }
543     } while( !r.choose( k ).pop( dst, k, *this ) );
544     return true;
545 }
546
547 template<typename T>
548 size_t concurrent_queue_base_v3<T>::internal_size() const {
549     concurrent_queue_rep<T>& r = *my_rep;
550     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
551     ticket hc = r.head_counter;
552     size_t nie = r.n_invalid_entries;
553     ticket tc = r.tail_counter;
554     __TBB_ASSERT( hc!=tc || !nie, NULL );
555     ptrdiff_t sz = tc-hc-nie;
556     return sz<0 ? 0 :  size_t(sz);
557 }
558
559 template<typename T>
560 bool concurrent_queue_base_v3<T>::internal_empty() const {
561     concurrent_queue_rep<T>& r = *my_rep;
562     ticket tc = r.tail_counter;
563     ticket hc = r.head_counter;
564     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
565     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
566 }
567
568 template<typename T>
569 void concurrent_queue_base_v3<T>::internal_finish_clear() {
570     concurrent_queue_rep<T>& r = *my_rep;
571     size_t nq = r.n_queue;
572     for( size_t i=0; i<nq; ++i ) {
573         page* tp = r.array[i].tail_page;
574         if( is_valid_page(tp) ) {
575             __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
576             deallocate_page( tp );
577             r.array[i].tail_page = NULL;
578         } else 
579             __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
580     }
581 }
582
583 template<typename T>
584 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
585     concurrent_queue_rep<T>& r = *my_rep;
586     r.items_per_page = src.my_rep->items_per_page;
587
588     // copy concurrent_queue_rep.
589     r.head_counter = src.my_rep->head_counter;
590     r.tail_counter = src.my_rep->tail_counter;
591     r.n_invalid_entries = src.my_rep->n_invalid_entries;
592
593     // copy micro_queues
594     for( size_t i = 0; i<r.n_queue; ++i )
595         r.array[i].assign( src.my_rep->array[i], *this);
596
597     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 
598             "the source concurrent queue should not be concurrently modified." );
599 }
600
601 template<typename Container, typename Value> class concurrent_queue_iterator;
602
603 template<typename T>
604 class concurrent_queue_iterator_rep: no_assign {
605     typedef typename micro_queue<T>::padded_page padded_page;
606 public:
607     ticket head_counter;
608     const concurrent_queue_base_v3<T>& my_queue;
609     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
610     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
611         head_counter(queue.my_rep->head_counter),
612         my_queue(queue)
613     {
614         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
615             array[k] = queue.my_rep->array[k].head_page;
616     }
617
618     //! Set item to point to kth element.  Return true if at end of queue or item is marked valid; false otherwise.
619     bool get_item( T*& item, size_t k ) ;
620 };
621
622 template<typename T>
623 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
624     if( k==my_queue.my_rep->tail_counter ) {
625         item = NULL;
626         return true;
627     } else {
628         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
629         __TBB_ASSERT(p,NULL);
630         size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
631         item = &micro_queue<T>::get_ref(*p,i);
632         return (p->mask & uintptr_t(1)<<i)!=0;
633     }
634 }
635
636 //! Constness-independent portion of concurrent_queue_iterator.
637 /** @ingroup containers */
638 template<typename Value>
639 class concurrent_queue_iterator_base_v3 : no_assign {
640     //! Represents concurrent_queue over which we are iterating.
641     /** NULL if one past last element in queue. */
642     concurrent_queue_iterator_rep<Value>* my_rep;
643
644     template<typename C, typename T, typename U>
645     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
646
647     template<typename C, typename T, typename U>
648     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
649 protected:
650     //! Pointer to current item
651     Value* my_item;
652
653     //! Default constructor
654     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
655 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
656         __TBB_compiler_fence();
657 #endif
658     }
659
660     //! Copy constructor
661     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
662     : no_assign(), my_rep(NULL), my_item(NULL) {
663         assign(i);
664     }
665
666     //! Construct iterator pointing to head of queue.
667     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
668
669     //! Assignment
670     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
671
672     //! Advance iterator one step towards tail of queue.
673     void advance() ;
674
675     //! Destructor
676     ~concurrent_queue_iterator_base_v3() {
677         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
678         my_rep = NULL;
679     }
680 };
681
682 template<typename Value>
683 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
684     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
685     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
686     size_t k = my_rep->head_counter;
687     if( !my_rep->get_item(my_item, k) ) advance();
688 }
689
690 template<typename Value>
691 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
692     if( my_rep!=other.my_rep ) {
693         if( my_rep ) {
694             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
695             my_rep = NULL;
696         }
697         if( other.my_rep ) {
698             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
699             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
700         }
701     }
702     my_item = other.my_item;
703 }
704
705 template<typename Value>
706 void concurrent_queue_iterator_base_v3<Value>::advance() {
707     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );  
708     size_t k = my_rep->head_counter;
709     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
710 #if TBB_USE_ASSERT
711     Value* tmp;
712     my_rep->get_item(tmp,k);
713     __TBB_ASSERT( my_item==tmp, NULL );
714 #endif /* TBB_USE_ASSERT */
715     size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
716     if( i==queue.my_rep->items_per_page-1 ) {
717         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
718         root = root->next;
719     }
720     // advance k
721     my_rep->head_counter = ++k;
722     if( !my_rep->get_item(my_item, k) ) advance();
723 }
724
725 //! Similar to C++0x std::remove_cv
726 /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
727 template<typename T> struct tbb_remove_cv {typedef T type;};
728 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
729 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
730 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
731
732 //! Meets requirements of a forward iterator for STL.
733 /** Value is either the T or const T type of the container.
734     @ingroup containers */
735 template<typename Container, typename Value>
736 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
737         public std::iterator<std::forward_iterator_tag,Value> {
738 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
739     template<typename T, class A>
740     friend class ::tbb::strict_ppl::concurrent_queue;
741 #else
742 public: // workaround for MSVC
743 #endif 
744     //! Construct iterator pointing to head of queue.
745     concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
746         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
747     {
748     }
749
750 public:
751     concurrent_queue_iterator() {}
752
753     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
754         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
755     {}
756
757     //! Iterator assignment
758     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
759         this->assign(other);
760         return *this;
761     }
762
763     //! Reference to current item 
764     Value& operator*() const {
765         return *static_cast<Value*>(this->my_item);
766     }
767
768     Value* operator->() const {return &operator*();}
769
770     //! Advance to next item in queue
771     concurrent_queue_iterator& operator++() {
772         this->advance();
773         return *this;
774     }
775
776     //! Post increment
777     Value* operator++(int) {
778         Value* result = &operator*();
779         operator++();
780         return result;
781     }
782 }; // concurrent_queue_iterator
783
784
785 template<typename C, typename T, typename U>
786 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
787     return i.my_item==j.my_item;
788 }
789
790 template<typename C, typename T, typename U>
791 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
792     return i.my_item!=j.my_item;
793 }
794
795 } // namespace internal
796
797 //! @endcond
798
799 } // namespace strict_ppl
800
801 //! @cond INTERNAL
802 namespace internal {
803
804 class concurrent_queue_rep;
805 class concurrent_queue_iterator_rep;
806 class concurrent_queue_iterator_base_v3;
807 template<typename Container, typename Value> class concurrent_queue_iterator;
808
809 //! For internal use only.
810 /** Type-independent portion of concurrent_queue.
811     @ingroup containers */
812 class concurrent_queue_base_v3: no_copy {
813     //! Internal representation
814     concurrent_queue_rep* my_rep;
815
816     friend class concurrent_queue_rep;
817     friend struct micro_queue;
818     friend class micro_queue_pop_finalizer;
819     friend class concurrent_queue_iterator_rep;
820     friend class concurrent_queue_iterator_base_v3;
821 protected:
822     //! Prefix on a page
823     struct page {
824         page* next;
825         uintptr_t mask; 
826     };
827
828     //! Capacity of the queue
829     ptrdiff_t my_capacity;
830    
831     //! Always a power of 2
832     size_t items_per_page;
833
834     //! Size of an item
835     size_t item_size;
836
837 #if __TBB_GCC_3_3_PROTECTED_BROKEN
838 public:
839 #endif
840     template<typename T>
841     struct padded_page: page {
842         //! Not defined anywhere - exists to quiet warnings.
843         padded_page(); 
844         //! Not defined anywhere - exists to quiet warnings.
845         void operator=( const padded_page& );
846         //! Must be last field.
847         T last;
848     };
849
850 private:
851     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
852     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
853 protected:
854     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
855     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
856
857     //! Enqueue item at tail of queue
858     void __TBB_EXPORTED_METHOD internal_push( const void* src );
859
860     //! Dequeue item from head of queue
861     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
862
863     //! Attempt to enqueue item onto queue.
864     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
865
866     //! Attempt to dequeue item from queue.
867     /** NULL if there was no item to dequeue. */
868     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
869
870     //! Get size of queue
871     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
872
873     //! Check if the queue is emtpy
874     bool __TBB_EXPORTED_METHOD internal_empty() const;
875
876     //! Set the queue capacity
877     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
878
879     //! custom allocator
880     virtual page *allocate_page() = 0;
881
882     //! custom de-allocator
883     virtual void deallocate_page( page *p ) = 0;
884
885     //! free any remaining pages
886     /* note that the name may be misleading, but it remains so due to a historical accident. */
887     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
888
889     //! throw an exception
890     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
891
892     //! copy internal representation
893     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
894
895 private:
896     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
897 };
898
899 //! Type-independent portion of concurrent_queue_iterator.
900 /** @ingroup containers */
901 class concurrent_queue_iterator_base_v3 {
902     //! concurrent_queue over which we are iterating.
903     /** NULL if one past last element in queue. */
904     concurrent_queue_iterator_rep* my_rep;
905
906     template<typename C, typename T, typename U>
907     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
908
909     template<typename C, typename T, typename U>
910     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
911
912     void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
913 protected:
914     //! Pointer to current item
915     void* my_item;
916
917     //! Default constructor
918     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
919
920     //! Copy constructor
921     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
922         assign(i);
923     }
924
925     //! Obsolete entry point for constructing iterator pointing to head of queue.
926     /** Does not work correctly for SSE types. */
927     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
928
929     //! Construct iterator pointing to head of queue.
930     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
931
932     //! Assignment
933     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
934
935     //! Advance iterator one step towards tail of queue.
936     void __TBB_EXPORTED_METHOD advance();
937
938     //! Destructor
939     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
940 };
941
942 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
943
944 //! Meets requirements of a forward iterator for STL.
945 /** Value is either the T or const T type of the container.
946     @ingroup containers */
947 template<typename Container, typename Value>
948 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
949         public std::iterator<std::forward_iterator_tag,Value> {
950
951 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
952     template<typename T, class A>
953     friend class ::tbb::concurrent_bounded_queue;
954
955     template<typename T, class A>
956     friend class ::tbb::deprecated::concurrent_queue;
957 #else
958 public: // workaround for MSVC
959 #endif 
960     //! Construct iterator pointing to head of queue.
961     concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
962         concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
963     {
964     }
965
966 public:
967     concurrent_queue_iterator() {}
968
969     /** If Value==Container::value_type, then this routine is the copy constructor. 
970         If Value==const Container::value_type, then this routine is a conversion constructor. */
971     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
972         concurrent_queue_iterator_base_v3(other)
973     {}
974
975     //! Iterator assignment
976     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
977         assign(other);
978         return *this;
979     }
980
981     //! Reference to current item 
982     Value& operator*() const {
983         return *static_cast<Value*>(my_item);
984     }
985
986     Value* operator->() const {return &operator*();}
987
988     //! Advance to next item in queue
989     concurrent_queue_iterator& operator++() {
990         advance();
991         return *this;
992     }
993
994     //! Post increment
995     Value* operator++(int) {
996         Value* result = &operator*();
997         operator++();
998         return result;
999     }
1000 }; // concurrent_queue_iterator
1001
1002
1003 template<typename C, typename T, typename U>
1004 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1005     return i.my_item==j.my_item;
1006 }
1007
1008 template<typename C, typename T, typename U>
1009 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1010     return i.my_item!=j.my_item;
1011 }
1012
1013 } // namespace internal;
1014
1015 //! @endcond
1016
1017 } // namespace tbb
1018
1019 #endif /* __TBB_concurrent_queue_internal_H */