]> git.sesse.net Git - casparcg/blob - tbb30_20100406oss/include/tbb/_concurrent_queue_internal.h
a7bbbb87058c0a823d9a3be83cf362fe929fde44
[casparcg] / tbb30_20100406oss / include / tbb / _concurrent_queue_internal.h
1 /*
2     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_concurrent_queue_internal_H
30 #define __TBB_concurrent_queue_internal_H
31
32 #include "tbb_stddef.h"
33 #include "tbb_machine.h"
34 #include "atomic.h"
35 #include "spin_mutex.h"
36 #include "cache_aligned_allocator.h"
37 #include "tbb_exception.h"
38 #include <new>
39
40 #if !TBB_USE_EXCEPTIONS && _MSC_VER
41     // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
42     #pragma warning (push)
43     #pragma warning (disable: 4530)
44 #endif
45
46 #include <iterator>
47
48 #if !TBB_USE_EXCEPTIONS && _MSC_VER
49     #pragma warning (pop)
50 #endif
51
52
53 namespace tbb {
54
55 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
56
57 // forward declaration
58 namespace strict_ppl {
59 template<typename T, typename A> class concurrent_queue;
60 }
61
62 template<typename T, typename A> class concurrent_bounded_queue;
63
64 namespace deprecated {
65 template<typename T, typename A> class concurrent_queue;
66 }
67 #endif
68
69 //! For internal use only.
70 namespace strict_ppl {
71
72 //! @cond INTERNAL
73 namespace internal {
74
75 using namespace tbb::internal;
76
77 typedef size_t ticket;
78
79 template<typename T> class micro_queue ;
80 template<typename T> class micro_queue_pop_finalizer ;
81 template<typename T> class concurrent_queue_base_v3;
82
83 //! parts of concurrent_queue_rep that do not have references to micro_queue
84 /**
85  * For internal use only.
86  */
87 struct concurrent_queue_rep_base : no_copy {
88     template<typename T> friend class micro_queue;
89     template<typename T> friend class concurrent_queue_base_v3;
90
91 protected:
92     //! Approximately n_queue/golden ratio
93     static const size_t phi = 3;
94
95 public:
96     // must be power of 2
97     static const size_t n_queue = 8;
98
99     //! Prefix on a page
100     struct page {
101         page* next;
102         uintptr_t mask; 
103     };
104
105     atomic<ticket> head_counter;
106     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
107     atomic<ticket> tail_counter;
108     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
109
110     //! Always a power of 2
111     size_t items_per_page;
112
113     //! Size of an item
114     size_t item_size;
115
116     //! number of invalid entries in the queue
117     atomic<size_t> n_invalid_entries;
118
119     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
120 } ;
121
122 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
123     return uintptr_t(p)>1;
124 }
125
126 //! Abstract class to define interface for page allocation/deallocation
127 /**
128  * For internal use only.
129  */
130 class concurrent_queue_page_allocator
131 {
132     template<typename T> friend class micro_queue ;
133     template<typename T> friend class micro_queue_pop_finalizer ;
134 protected:
135     virtual ~concurrent_queue_page_allocator() {}
136 private:
137     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
138     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
139 } ;
140
141 #if _MSC_VER && !defined(__INTEL_COMPILER)
142 // unary minus operator applied to unsigned type, result still unsigned
143 #pragma warning( push )
144 #pragma warning( disable: 4146 )
145 #endif
146
147 //! A queue using simple locking.
148 /** For efficient, this class has no constructor.  
149     The caller is expected to zero-initialize it. */
150 template<typename T>
151 class micro_queue : no_copy {
152     typedef concurrent_queue_rep_base::page page;
153
154     //! Class used to ensure exception-safety of method "pop" 
155     class destroyer: no_copy {
156         T& my_value;
157     public:
158         destroyer( T& value ) : my_value(value) {}
159         ~destroyer() {my_value.~T();}          
160     };
161
162     void copy_item( page& dst, size_t index, const void* src ) {
163         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
164     }
165
166     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
167         new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
168     }
169
170     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
171         T& from = get_ref(src,index);
172         destroyer d(from);
173         *static_cast<T*>(dst) = from;
174     }
175
176     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
177
178 public:
179     friend class micro_queue_pop_finalizer<T>;
180
181     struct padded_page: page {
182         //! Not defined anywhere - exists to quiet warnings.
183         padded_page(); 
184         //! Not defined anywhere - exists to quiet warnings.
185         void operator=( const padded_page& );
186         //! Must be last field.
187         T last;
188     };
189
190     static T& get_ref( page& p, size_t index ) {
191         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
192     }
193
194     atomic<page*> head_page;
195     atomic<ticket> head_counter;
196
197     atomic<page*> tail_page;
198     atomic<ticket> tail_counter;
199
200     spin_mutex page_mutex;
201     
202     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
203
204     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
205
206     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
207
208     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
209
210     void invalidate_page_and_rethrow( ticket k ) ;
211 };
212
213 template<typename T>
214 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
215     atomic_backoff backoff;
216     do {
217         backoff.pause();
218         if( counter&1 ) {
219             ++rb.n_invalid_entries;
220             throw_exception( eid_bad_last_alloc );
221         }
222     } while( counter!=k ) ;
223 }
224
225 template<typename T>
226 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
227     k &= -concurrent_queue_rep_base::n_queue;
228     page* p = NULL;
229     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
230     if( !index ) {
231         __TBB_TRY {
232             concurrent_queue_page_allocator& pa = base;
233             p = pa.allocate_page();
234         } __TBB_CATCH (...) {
235             ++base.my_rep->n_invalid_entries;
236             invalidate_page_and_rethrow( k );
237         }
238         p->mask = 0;
239         p->next = NULL;
240     }
241     
242     if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
243         
244     if( p ) {
245         spin_mutex::scoped_lock lock( page_mutex );
246         page* q = tail_page;
247         if( is_valid_page(q) )
248             q->next = p;
249         else
250             head_page = p; 
251         tail_page = p;
252     } else {
253         p = tail_page;
254     }
255    
256     __TBB_TRY {
257         copy_item( *p, index, item );
258         // If no exception was thrown, mark item as present.
259         p->mask |= uintptr_t(1)<<index;
260         tail_counter += concurrent_queue_rep_base::n_queue; 
261     } __TBB_CATCH (...) {
262         ++base.my_rep->n_invalid_entries;
263         tail_counter += concurrent_queue_rep_base::n_queue; 
264         __TBB_RETHROW();
265     }
266 }
267
268 template<typename T>
269 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
270     k &= -concurrent_queue_rep_base::n_queue;
271     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
272     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
273     page& p = *head_page;
274     __TBB_ASSERT( &p, NULL );
275     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
276     bool success = false; 
277     {
278         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 
279         if( p.mask & uintptr_t(1)<<index ) {
280             success = true;
281             assign_and_destroy_item( dst, p, index );
282         } else {
283             --base.my_rep->n_invalid_entries;
284         }
285     }
286     return success;
287 }
288
289 template<typename T>
290 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
291     head_counter = src.head_counter;
292     tail_counter = src.tail_counter;
293     page_mutex   = src.page_mutex;
294
295     const page* srcp = src.head_page;
296     if( is_valid_page(srcp) ) {
297         ticket g_index = head_counter;
298         __TBB_TRY {
299             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
300             size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
301             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
302
303             head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
304             page* cur_page = head_page;
305
306             if( srcp != src.tail_page ) {
307                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
308                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
309                     cur_page = cur_page->next;
310                 }
311
312                 __TBB_ASSERT( srcp==src.tail_page, NULL );
313                 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
314                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
315
316                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
317                 cur_page = cur_page->next;
318             }
319             tail_page = cur_page;
320         } __TBB_CATCH (...) {
321             invalidate_page_and_rethrow( g_index );
322         }
323     } else {
324         head_page = tail_page = NULL;
325     }
326     return *this;
327 }
328
329 template<typename T>
330 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
331     // Append an invalid page at address 1 so that no more pushes are allowed.
332     page* invalid_page = (page*)uintptr_t(1);
333     {
334         spin_mutex::scoped_lock lock( page_mutex );
335         tail_counter = k+concurrent_queue_rep_base::n_queue+1;
336         page* q = tail_page;
337         if( is_valid_page(q) )
338             q->next = invalid_page;
339         else
340             head_page = invalid_page;
341         tail_page = invalid_page;
342     }
343     __TBB_RETHROW();
344 }
345
346 template<typename T>
347 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
348     concurrent_queue_page_allocator& pa = base;
349     page* new_page = pa.allocate_page();
350     new_page->next = NULL;
351     new_page->mask = src_page->mask;
352     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353         if( new_page->mask & uintptr_t(1)<<begin_in_page )
354             copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
355     return new_page;
356 }
357
358 template<typename T>
359 class micro_queue_pop_finalizer: no_copy {
360     typedef concurrent_queue_rep_base::page page;
361     ticket my_ticket;
362     micro_queue<T>& my_queue;
363     page* my_page; 
364     concurrent_queue_page_allocator& allocator;
365 public:
366     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
367         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
368     {}
369     ~micro_queue_pop_finalizer() ;
370 };
371
372 template<typename T>
373 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
374     page* p = my_page;
375     if( is_valid_page(p) ) {
376         spin_mutex::scoped_lock lock( my_queue.page_mutex );
377         page* q = p->next;
378         my_queue.head_page = q;
379         if( !is_valid_page(q) ) {
380             my_queue.tail_page = NULL;
381         }
382     }
383     my_queue.head_counter = my_ticket;
384     if( is_valid_page(p) ) {
385         allocator.deallocate_page( p );
386     }
387 }
388
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
392
393 template<typename T> class concurrent_queue_iterator_rep ;
394 template<typename T> class concurrent_queue_iterator_base_v3;
395
396 //! representation of concurrent_queue_base
397 /**
398  * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
399  */
400 template<typename T>
401 struct concurrent_queue_rep : public concurrent_queue_rep_base {
402     micro_queue<T> array[n_queue];
403
404     //! Map ticket to an array index
405     static size_t index( ticket k ) {
406         return k*phi%n_queue;
407     }
408
409     micro_queue<T>& choose( ticket k ) {
410         // The formula here approximates LRU in a cache-oblivious way.
411         return array[index(k)];
412     }
413 };
414
415 //! base class of concurrent_queue
416 /**
417  * The class implements the interface defined by concurrent_queue_page_allocator
418  * and has a pointer to an instance of concurrent_queue_rep.
419  */
420 template<typename T>
421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
422     //! Internal representation
423     concurrent_queue_rep<T>* my_rep;
424
425     friend struct concurrent_queue_rep<T>;
426     friend class micro_queue<T>;
427     friend class concurrent_queue_iterator_rep<T>;
428     friend class concurrent_queue_iterator_base_v3<T>;
429
430 protected:
431     typedef typename concurrent_queue_rep<T>::page page;
432
433 private:
434     typedef typename micro_queue<T>::padded_page padded_page;
435
436     /* override */ virtual page *allocate_page() {
437         concurrent_queue_rep<T>& r = *my_rep;
438         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
439         return reinterpret_cast<page*>(allocate_block ( n ));
440     }
441
442     /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
443         concurrent_queue_rep<T>& r = *my_rep;
444         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
445         deallocate_block( reinterpret_cast<void*>(p), n );
446     }
447
448     //! custom allocator
449     virtual void *allocate_block( size_t n ) = 0;
450
451     //! custom de-allocator
452     virtual void deallocate_block( void *p, size_t n ) = 0;
453
454 protected:
455     concurrent_queue_base_v3();
456
457     /* override */ virtual ~concurrent_queue_base_v3() {
458 #if __TBB_USE_ASSERT
459         size_t nq = my_rep->n_queue;
460         for( size_t i=0; i<nq; i++ )
461             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
462 #endif /* __TBB_USE_ASSERT */
463         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
464     }
465
466     //! Enqueue item at tail of queue
467     void internal_push( const void* src ) {
468         concurrent_queue_rep<T>& r = *my_rep;
469         ticket k = r.tail_counter++;
470         r.choose(k).push( src, k, *this );
471     }
472
473     //! Attempt to dequeue item from queue.
474     /** NULL if there was no item to dequeue. */
475     bool internal_try_pop( void* dst ) ;
476
477     //! Get size of queue; result may be invalid if queue is modified concurrently
478     size_t internal_size() const ;
479
480     //! check if the queue is empty; thread safe
481     bool internal_empty() const ;
482
483     //! free any remaining pages
484     /* note that the name may be misleading, but it remains so due to a historical accident. */
485     void internal_finish_clear() ;
486
487     //! Obsolete
488     void internal_throw_exception() const {
489         throw_exception( eid_bad_alloc );
490     }
491
492     //! copy internal representation
493     void assign( const concurrent_queue_base_v3& src ) ;
494 };
495
496 template<typename T>
497 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
498     const size_t item_size = sizeof(T);
499     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
500     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
501     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
502     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
503     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
504     memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
505     my_rep->item_size = item_size;
506     my_rep->items_per_page = item_size<=8 ? 32 :
507                              item_size<=16 ? 16 : 
508                              item_size<=32 ? 8 :
509                              item_size<=64 ? 4 :
510                              item_size<=128 ? 2 :
511                              1;
512 }
513
514 template<typename T>
515 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
516     concurrent_queue_rep<T>& r = *my_rep;
517     ticket k;
518     do {
519         k = r.head_counter;
520         for(;;) {
521             if( r.tail_counter<=k ) {
522                 // Queue is empty 
523                 return false;
524             }
525             // Queue had item with ticket k when we looked.  Attempt to get that item.
526             ticket tk=k;
527 #if defined(_MSC_VER) && defined(_Wp64)
528     #pragma warning (push)
529     #pragma warning (disable: 4267)
530 #endif
531             k = r.head_counter.compare_and_swap( tk+1, tk );
532 #if defined(_MSC_VER) && defined(_Wp64)
533     #pragma warning (pop)
534 #endif
535             if( k==tk )
536                 break;
537             // Another thread snatched the item, retry.
538         }
539     } while( !r.choose( k ).pop( dst, k, *this ) );
540     return true;
541 }
542
543 template<typename T>
544 size_t concurrent_queue_base_v3<T>::internal_size() const {
545     concurrent_queue_rep<T>& r = *my_rep;
546     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
547     ticket hc = r.head_counter;
548     size_t nie = r.n_invalid_entries;
549     ticket tc = r.tail_counter;
550     __TBB_ASSERT( hc!=tc || !nie, NULL );
551     ptrdiff_t sz = tc-hc-nie;
552     return sz<0 ? 0 :  size_t(sz);
553 }
554
555 template<typename T>
556 bool concurrent_queue_base_v3<T>::internal_empty() const {
557     concurrent_queue_rep<T>& r = *my_rep;
558     ticket tc = r.tail_counter;
559     ticket hc = r.head_counter;
560     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
561     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
562 }
563
564 template<typename T>
565 void concurrent_queue_base_v3<T>::internal_finish_clear() {
566     concurrent_queue_rep<T>& r = *my_rep;
567     size_t nq = r.n_queue;
568     for( size_t i=0; i<nq; ++i ) {
569         page* tp = r.array[i].tail_page;
570         if( is_valid_page(tp) ) {
571             __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
572             deallocate_page( tp );
573             r.array[i].tail_page = NULL;
574         } else 
575             __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
576     }
577 }
578
579 template<typename T>
580 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
581     concurrent_queue_rep<T>& r = *my_rep;
582     r.items_per_page = src.my_rep->items_per_page;
583
584     // copy concurrent_queue_rep.
585     r.head_counter = src.my_rep->head_counter;
586     r.tail_counter = src.my_rep->tail_counter;
587     r.n_invalid_entries = src.my_rep->n_invalid_entries;
588
589     // copy micro_queues
590     for( size_t i = 0; i<r.n_queue; ++i )
591         r.array[i].assign( src.my_rep->array[i], *this);
592
593     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 
594             "the source concurrent queue should not be concurrently modified." );
595 }
596
597 template<typename Container, typename Value> class concurrent_queue_iterator;
598
599 template<typename T>
600 class concurrent_queue_iterator_rep: no_assign {
601     typedef typename micro_queue<T>::padded_page padded_page;
602 public:
603     ticket head_counter;
604     const concurrent_queue_base_v3<T>& my_queue;
605     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
606     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
607         head_counter(queue.my_rep->head_counter),
608         my_queue(queue)
609     {
610         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
611             array[k] = queue.my_rep->array[k].head_page;
612     }
613
614     //! Set item to point to kth element.  Return true if at end of queue or item is marked valid; false otherwise.
615     bool get_item( T*& item, size_t k ) ;
616 };
617
618 template<typename T>
619 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
620     if( k==my_queue.my_rep->tail_counter ) {
621         item = NULL;
622         return true;
623     } else {
624         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
625         __TBB_ASSERT(p,NULL);
626         size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
627         item = &micro_queue<T>::get_ref(*p,i);
628         return (p->mask & uintptr_t(1)<<i)!=0;
629     }
630 }
631
632 //! Constness-independent portion of concurrent_queue_iterator.
633 /** @ingroup containers */
634 template<typename Value>
635 class concurrent_queue_iterator_base_v3 : no_assign {
636     //! Represents concurrent_queue over which we are iterating.
637     /** NULL if one past last element in queue. */
638     concurrent_queue_iterator_rep<Value>* my_rep;
639
640     template<typename C, typename T, typename U>
641     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
642
643     template<typename C, typename T, typename U>
644     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
645 protected:
646     //! Pointer to current item
647     Value* my_item;
648
649     //! Default constructor
650     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
651 #if __GNUC__==4&&__GNUC_MINOR__==3
652         // to get around a possible gcc 4.3 bug
653         __asm__ __volatile__("": : :"memory");
654 #endif
655     }
656
657     //! Copy constructor
658     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
659     : no_assign(), my_rep(NULL), my_item(NULL) {
660         assign(i);
661     }
662
663     //! Construct iterator pointing to head of queue.
664     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
665
666     //! Assignment
667     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
668
669     //! Advance iterator one step towards tail of queue.
670     void advance() ;
671
672     //! Destructor
673     ~concurrent_queue_iterator_base_v3() {
674         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
675         my_rep = NULL;
676     }
677 };
678
679 template<typename Value>
680 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
681     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
682     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
683     size_t k = my_rep->head_counter;
684     if( !my_rep->get_item(my_item, k) ) advance();
685 }
686
687 template<typename Value>
688 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
689     if( my_rep!=other.my_rep ) {
690         if( my_rep ) {
691             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
692             my_rep = NULL;
693         }
694         if( other.my_rep ) {
695             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
696             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
697         }
698     }
699     my_item = other.my_item;
700 }
701
702 template<typename Value>
703 void concurrent_queue_iterator_base_v3<Value>::advance() {
704     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );  
705     size_t k = my_rep->head_counter;
706     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
707 #if TBB_USE_ASSERT
708     Value* tmp;
709     my_rep->get_item(tmp,k);
710     __TBB_ASSERT( my_item==tmp, NULL );
711 #endif /* TBB_USE_ASSERT */
712     size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
713     if( i==queue.my_rep->items_per_page-1 ) {
714         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
715         root = root->next;
716     }
717     // advance k
718     my_rep->head_counter = ++k;
719     if( !my_rep->get_item(my_item, k) ) advance();
720 }
721
722 //! Similar to C++0x std::remove_cv
723 /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
724 template<typename T> struct tbb_remove_cv {typedef T type;};
725 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
726 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
727 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
728
729 //! Meets requirements of a forward iterator for STL.
730 /** Value is either the T or const T type of the container.
731     @ingroup containers */
732 template<typename Container, typename Value>
733 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
734         public std::iterator<std::forward_iterator_tag,Value> {
735 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
736     template<typename T, class A>
737     friend class ::tbb::strict_ppl::concurrent_queue;
738 #else
739 public: // workaround for MSVC
740 #endif 
741     //! Construct iterator pointing to head of queue.
742     concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
743         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
744     {
745     }
746
747 public:
748     concurrent_queue_iterator() {}
749
750     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
751         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
752     {}
753
754     //! Iterator assignment
755     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
756         assign(other);
757         return *this;
758     }
759
760     //! Reference to current item 
761     Value& operator*() const {
762         return *static_cast<Value*>(this->my_item);
763     }
764
765     Value* operator->() const {return &operator*();}
766
767     //! Advance to next item in queue
768     concurrent_queue_iterator& operator++() {
769         this->advance();
770         return *this;
771     }
772
773     //! Post increment
774     Value* operator++(int) {
775         Value* result = &operator*();
776         operator++();
777         return result;
778     }
779 }; // concurrent_queue_iterator
780
781
782 template<typename C, typename T, typename U>
783 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
784     return i.my_item==j.my_item;
785 }
786
787 template<typename C, typename T, typename U>
788 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
789     return i.my_item!=j.my_item;
790 }
791
792 } // namespace internal
793
794 //! @endcond
795
796 } // namespace strict_ppl
797
798 //! @cond INTERNAL
799 namespace internal {
800
801 class concurrent_queue_rep;
802 class concurrent_queue_iterator_rep;
803 class concurrent_queue_iterator_base_v3;
804 template<typename Container, typename Value> class concurrent_queue_iterator;
805
806 //! For internal use only.
807 /** Type-independent portion of concurrent_queue.
808     @ingroup containers */
809 class concurrent_queue_base_v3: no_copy {
810     //! Internal representation
811     concurrent_queue_rep* my_rep;
812
813     friend class concurrent_queue_rep;
814     friend struct micro_queue;
815     friend class micro_queue_pop_finalizer;
816     friend class concurrent_queue_iterator_rep;
817     friend class concurrent_queue_iterator_base_v3;
818 protected:
819     //! Prefix on a page
820     struct page {
821         page* next;
822         uintptr_t mask; 
823     };
824
825     //! Capacity of the queue
826     ptrdiff_t my_capacity;
827    
828     //! Always a power of 2
829     size_t items_per_page;
830
831     //! Size of an item
832     size_t item_size;
833
834 #if __TBB_GCC_3_3_PROTECTED_BROKEN
835 public:
836 #endif
837     template<typename T>
838     struct padded_page: page {
839         //! Not defined anywhere - exists to quiet warnings.
840         padded_page(); 
841         //! Not defined anywhere - exists to quiet warnings.
842         void operator=( const padded_page& );
843         //! Must be last field.
844         T last;
845     };
846
847 private:
848     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
849     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
850 protected:
851     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
852     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
853
854     //! Enqueue item at tail of queue
855     void __TBB_EXPORTED_METHOD internal_push( const void* src );
856
857     //! Dequeue item from head of queue
858     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
859
860     //! Attempt to enqueue item onto queue.
861     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
862
863     //! Attempt to dequeue item from queue.
864     /** NULL if there was no item to dequeue. */
865     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
866
867     //! Get size of queue
868     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
869
870     //! Check if the queue is emtpy
871     bool __TBB_EXPORTED_METHOD internal_empty() const;
872
873     //! Set the queue capacity
874     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
875
876     //! custom allocator
877     virtual page *allocate_page() = 0;
878
879     //! custom de-allocator
880     virtual void deallocate_page( page *p ) = 0;
881
882     //! free any remaining pages
883     /* note that the name may be misleading, but it remains so due to a historical accident. */
884     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
885
886     //! throw an exception
887     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
888
889     //! copy internal representation
890     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
891
892 private:
893     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
894 };
895
896 //! Type-independent portion of concurrent_queue_iterator.
897 /** @ingroup containers */
898 class concurrent_queue_iterator_base_v3 {
899     //! concurrent_queue over which we are iterating.
900     /** NULL if one past last element in queue. */
901     concurrent_queue_iterator_rep* my_rep;
902
903     template<typename C, typename T, typename U>
904     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
905
906     template<typename C, typename T, typename U>
907     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
908
909     void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
910 protected:
911     //! Pointer to current item
912     void* my_item;
913
914     //! Default constructor
915     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
916
917     //! Copy constructor
918     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
919         assign(i);
920     }
921
922     //! Obsolete entry point for constructing iterator pointing to head of queue.
923     /** Does not work correctly for SSE types. */
924     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
925
926     //! Construct iterator pointing to head of queue.
927     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
928
929     //! Assignment
930     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
931
932     //! Advance iterator one step towards tail of queue.
933     void __TBB_EXPORTED_METHOD advance();
934
935     //! Destructor
936     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
937 };
938
939 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
940
941 //! Meets requirements of a forward iterator for STL.
942 /** Value is either the T or const T type of the container.
943     @ingroup containers */
944 template<typename Container, typename Value>
945 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
946         public std::iterator<std::forward_iterator_tag,Value> {
947
948 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
949     template<typename T, class A>
950     friend class ::tbb::concurrent_bounded_queue;
951
952     template<typename T, class A>
953     friend class ::tbb::deprecated::concurrent_queue;
954 #else
955 public: // workaround for MSVC
956 #endif 
957     //! Construct iterator pointing to head of queue.
958     concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
959         concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
960     {
961     }
962
963 public:
964     concurrent_queue_iterator() {}
965
966     /** If Value==Container::value_type, then this routine is the copy constructor. 
967         If Value==const Container::value_type, then this routine is a conversion constructor. */
968     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
969         concurrent_queue_iterator_base_v3(other)
970     {}
971
972     //! Iterator assignment
973     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
974         assign(other);
975         return *this;
976     }
977
978     //! Reference to current item 
979     Value& operator*() const {
980         return *static_cast<Value*>(my_item);
981     }
982
983     Value* operator->() const {return &operator*();}
984
985     //! Advance to next item in queue
986     concurrent_queue_iterator& operator++() {
987         advance();
988         return *this;
989     }
990
991     //! Post increment
992     Value* operator++(int) {
993         Value* result = &operator*();
994         operator++();
995         return result;
996     }
997 }; // concurrent_queue_iterator
998
999
1000 template<typename C, typename T, typename U>
1001 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1002     return i.my_item==j.my_item;
1003 }
1004
1005 template<typename C, typename T, typename U>
1006 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1007     return i.my_item!=j.my_item;
1008 }
1009
1010 } // namespace internal;
1011
1012 //! @endcond
1013
1014 } // namespace tbb
1015
1016 #endif /* __TBB_concurrent_queue_internal_H */