]> git.sesse.net Git - casparcg/blob - dependencies64/tbb/include/tbb/concurrent_priority_queue.h
247c91819e64b2572b8173d15fd19afb64af8dcf
[casparcg] / dependencies64 / tbb / include / tbb / concurrent_priority_queue.h
1 /*
2     Copyright 2005-2014 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5     you can redistribute it and/or modify it under the terms of the GNU General Public License
6     version 2  as  published  by  the  Free Software Foundation.  Threading Building Blocks is
7     distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9     See  the GNU General Public License for more details.   You should have received a copy of
10     the  GNU General Public License along with Threading Building Blocks; if not, write to the
11     Free Software Foundation, Inc.,  51 Franklin St,  Fifth Floor,  Boston,  MA 02110-1301 USA
12
13     As a special exception,  you may use this file  as part of a free software library without
14     restriction.  Specifically,  if other files instantiate templates  or use macros or inline
15     functions from this file, or you compile this file and link it with other files to produce
16     an executable,  this file does not by itself cause the resulting executable to be covered
17     by the GNU General Public License. This exception does not however invalidate any other
18     reasons why the executable file might be covered by the GNU General Public License.
19 */
20
21 #ifndef __TBB_concurrent_priority_queue_H
22 #define __TBB_concurrent_priority_queue_H
23
24 #include "atomic.h"
25 #include "cache_aligned_allocator.h"
26 #include "tbb_exception.h"
27 #include "tbb_stddef.h"
28 #include "tbb_profiling.h"
29 #include "internal/_aggregator_impl.h"
30 #include <vector>
31 #include <iterator>
32 #include <functional>
33
34 #if __TBB_INITIALIZER_LISTS_PRESENT
35     #include <initializer_list>
36 #endif
37
38 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
39     #include <type_traits>
40 #endif
41
42 namespace tbb {
43 namespace interface5 {
44 namespace internal {
45 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
46     template<typename T, bool C = std::is_copy_constructible<T>::value>
47     struct use_element_copy_constructor {
48         typedef tbb::internal::true_type type;
49     };
50     template<typename T>
51     struct use_element_copy_constructor <T,false> {
52         typedef tbb::internal::false_type type;
53     };
54 #else
55     template<typename>
56     struct use_element_copy_constructor {
57         typedef tbb::internal::true_type type;
58     };
59 #endif
60 } // namespace internal
61
62 using namespace tbb::internal;
63
64 //! Concurrent priority queue
65 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
66 class concurrent_priority_queue {
67  public:
68     //! Element type in the queue.
69     typedef T value_type;
70
71     //! Reference type
72     typedef T& reference;
73
74     //! Const reference type
75     typedef const T& const_reference;
76
77     //! Integral type for representing size of the queue.
78     typedef size_t size_type;
79
80     //! Difference type for iterator
81     typedef ptrdiff_t difference_type;
82
83     //! Allocator type
84     typedef A allocator_type;
85
86     //! Constructs a new concurrent_priority_queue with default capacity
87     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
88     {
89         my_aggregator.initialize_handler(my_functor_t(this));
90     }
91
92     //! Constructs a new concurrent_priority_queue with init_sz capacity
93     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
94         mark(0), my_size(0), data(a)
95     {
96         data.reserve(init_capacity);
97         my_aggregator.initialize_handler(my_functor_t(this));
98     }
99
100     //! [begin,end) constructor
101     template<typename InputIterator>
102     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
103         mark(0), data(begin, end, a)
104     {
105         my_aggregator.initialize_handler(my_functor_t(this));
106         heapify();
107         my_size = data.size();
108     }
109
110 #if __TBB_INITIALIZER_LISTS_PRESENT
111     //! Constructor from std::initializer_list
112     concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
113         mark(0),data(init_list.begin(), init_list.end(), a)
114     {
115         my_aggregator.initialize_handler(my_functor_t(this));
116         heapify();
117         my_size = data.size();
118     }
119 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
120
121     //! Copy constructor
122     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
123     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
124         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
125     {
126         my_aggregator.initialize_handler(my_functor_t(this));
127         heapify();
128     }
129
130     //! Copy constructor with specific allocator
131     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
132     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
133         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
134     {
135         my_aggregator.initialize_handler(my_functor_t(this));
136         heapify();
137     }
138
139     //! Assignment operator
140     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
141     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
142         if (this != &src) {
143             vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
144             mark = src.mark;
145             my_size = src.my_size;
146         }
147         return *this;
148     }
149
150 #if __TBB_CPP11_RVALUE_REF_PRESENT
151     //! Move constructor
152     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
153     concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
154         my_size(src.my_size), data(std::move(src.data))
155     {
156         my_aggregator.initialize_handler(my_functor_t(this));
157     }
158
159     //! Move constructor with specific allocator
160     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
161     concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
162         my_size(src.my_size),
163 #if __TBB_ALLOCATOR_TRAITS_PRESENT
164         data(std::move(src.data), a)
165 #else
166     // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
167     // It seems that the reason is absence of support of allocator_traits (stateful allocators).
168         data(a)
169 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
170     {
171         my_aggregator.initialize_handler(my_functor_t(this));
172 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
173         if (a != src.data.get_allocator()){
174             data.reserve(src.data.size());
175             data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
176         }else{
177             data = std::move(src.data);
178         }
179 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
180     }
181
182     //! Move assignment operator
183     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
184     concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
185         if (this != &src) {
186             mark = src.mark;
187             my_size = src.my_size;
188 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
189             if (data.get_allocator() != src.data.get_allocator()){
190                 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
191             }else
192 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
193             {
194                 data = std::move(src.data);
195             }
196         }
197         return *this;
198     }
199 #endif //__TBB_CPP11_RVALUE_REF_PRESENT
200
201     //! Assign the queue from [begin,end) range, not thread-safe
202     template<typename InputIterator>
203     void assign(InputIterator begin, InputIterator end) {
204         vector_t(begin, end, data.get_allocator()).swap(data);
205         mark = 0;
206         my_size = data.size();
207         heapify();
208     }
209
210 #if __TBB_INITIALIZER_LISTS_PRESENT
211     //! Assign the queue from std::initializer_list, not thread-safe
212     void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
213
214     //! Assign from std::initializer_list, not thread-safe
215     concurrent_priority_queue& operator=(std::initializer_list<T> il) {
216         this->assign(il.begin(), il.end());
217         return *this;
218     }
219 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
220
221     //! Returns true if empty, false otherwise
222     /** Returned value may not reflect results of pending operations.
223         This operation reads shared data and will trigger a race condition. */
224     bool empty() const { return size()==0; }
225
226     //! Returns the current number of elements contained in the queue
227     /** Returned value may not reflect results of pending operations.
228         This operation reads shared data and will trigger a race condition. */
229     size_type size() const { return __TBB_load_with_acquire(my_size); }
230
231     //! Pushes elem onto the queue, increasing capacity of queue if necessary
232     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
233     void push(const_reference elem) {
234 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
235         __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
236 #endif
237         cpq_operation op_data(elem, PUSH_OP);
238         my_aggregator.execute(&op_data);
239         if (op_data.status == FAILED) // exception thrown
240             throw_exception(eid_bad_alloc);
241     }
242
243 #if __TBB_CPP11_RVALUE_REF_PRESENT
244     //! Pushes elem onto the queue, increasing capacity of queue if necessary
245     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
246     void push(value_type &&elem) {
247         cpq_operation op_data(elem, PUSH_RVALUE_OP);
248         my_aggregator.execute(&op_data);
249         if (op_data.status == FAILED) // exception thrown
250             throw_exception(eid_bad_alloc);
251     }
252
253 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
254     //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
255     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
256     template<typename... Args>
257     void emplace(Args&&... args) {
258         push(value_type(std::forward<Args>(args)...));
259     }
260 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
261 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
262
263     //! Gets a reference to and removes highest priority element
264     /** If a highest priority element was found, sets elem and returns true,
265         otherwise returns false.
266         This operation can be safely used concurrently with other push, try_pop or emplace operations. */
267     bool try_pop(reference elem) {
268         cpq_operation op_data(POP_OP);
269         op_data.elem = &elem;
270         my_aggregator.execute(&op_data);
271         return op_data.status==SUCCEEDED;
272     }
273
274     //! Clear the queue; not thread-safe
275     /** This operation is unsafe if there are pending concurrent operations on the queue.
276         Resets size, effectively emptying queue; does not free space.
277         May not clear elements added in pending operations. */
278     void clear() {
279         data.clear();
280         mark = 0;
281         my_size = 0;
282     }
283
284     //! Swap this queue with another; not thread-safe
285     /** This operation is unsafe if there are pending concurrent operations on the queue. */
286     void swap(concurrent_priority_queue& q) {
287         using std::swap;
288         data.swap(q.data);
289         swap(mark, q.mark);
290         swap(my_size, q.my_size);
291     }
292
293     //! Return allocator object
294     allocator_type get_allocator() const { return data.get_allocator(); }
295
296  private:
297     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
298     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
299
300     class cpq_operation : public aggregated_operation<cpq_operation> {
301      public:
302         operation_type type;
303         union {
304             value_type *elem;
305             size_type sz;
306         };
307         cpq_operation(const_reference e, operation_type t) :
308             type(t), elem(const_cast<value_type*>(&e)) {}
309         cpq_operation(operation_type t) : type(t) {}
310     };
311
312     class my_functor_t {
313         concurrent_priority_queue<T, Compare, A> *cpq;
314      public:
315         my_functor_t() {}
316         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
317         void operator()(cpq_operation* op_list) {
318             cpq->handle_operations(op_list);
319         }
320     };
321
322     typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
323     aggregator_t my_aggregator;
324     //! Padding added to avoid false sharing
325     char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
326     //! The point at which unsorted elements begin
327     size_type mark;
328     __TBB_atomic size_type my_size;
329     Compare compare;
330     //! Padding added to avoid false sharing
331     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
332     //! Storage for the heap of elements in queue, plus unheapified elements
333     /** data has the following structure:
334
335          binary unheapified
336           heap   elements
337         ____|_______|____
338         |       |       |
339         v       v       v
340         [_|...|_|_|...|_| |...| ]
341          0       ^       ^       ^
342                  |       |       |__capacity
343                  |       |__my_size
344                  |__mark
345
346         Thus, data stores the binary heap starting at position 0 through
347         mark-1 (it may be empty).  Then there are 0 or more elements
348         that have not yet been inserted into the heap, in positions
349         mark through my_size-1. */
350     typedef std::vector<value_type, allocator_type> vector_t;
351     vector_t data;
352
353     void handle_operations(cpq_operation *op_list) {
354         cpq_operation *tmp, *pop_list=NULL;
355
356         __TBB_ASSERT(mark == data.size(), NULL);
357
358         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
359         while (op_list) {
360             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
361             // node. This thread is going to handle the operation, and so will acquire it
362             // and perform the associated operation w/o triggering a race condition; the
363             // thread that created the operation is waiting on the status field, so when
364             // this thread is done with the operation, it will perform a
365             // store_with_release to give control back to the waiting thread in
366             // aggregator::insert_operation.
367             call_itt_notify(acquired, &(op_list->status));
368             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
369             tmp = op_list;
370             op_list = itt_hide_load_word(op_list->next);
371             if (tmp->type == POP_OP) {
372                 if (mark < data.size() &&
373                     compare(data[0], data[data.size()-1])) {
374                     // there are newly pushed elems and the last one
375                     // is higher than top
376                     *(tmp->elem) = move(data[data.size()-1]);
377                     __TBB_store_with_release(my_size, my_size-1);
378                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
379                     data.pop_back();
380                     __TBB_ASSERT(mark<=data.size(), NULL);
381                 }
382                 else { // no convenient item to pop; postpone
383                     itt_hide_store_word(tmp->next, pop_list);
384                     pop_list = tmp;
385                 }
386             } else { // PUSH_OP or PUSH_RVALUE_OP
387                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
388                 __TBB_TRY{
389                     if (tmp->type == PUSH_OP) {
390                         push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
391                     } else {
392                         data.push_back(move(*(tmp->elem)));
393                     }
394                     __TBB_store_with_release(my_size, my_size + 1);
395                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
396                 } __TBB_CATCH(...) {
397                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
398                 }
399             }
400         }
401
402         // second pass processes pop operations
403         while (pop_list) {
404             tmp = pop_list;
405             pop_list = itt_hide_load_word(pop_list->next);
406             __TBB_ASSERT(tmp->type == POP_OP, NULL);
407             if (data.empty()) {
408                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
409             }
410             else {
411                 __TBB_ASSERT(mark<=data.size(), NULL);
412                 if (mark < data.size() &&
413                     compare(data[0], data[data.size()-1])) {
414                     // there are newly pushed elems and the last one is
415                     // higher than top
416                     *(tmp->elem) = move(data[data.size()-1]);
417                     __TBB_store_with_release(my_size, my_size-1);
418                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
419                     data.pop_back();
420                 }
421                 else { // extract top and push last element down heap
422                     *(tmp->elem) = move(data[0]);
423                     __TBB_store_with_release(my_size, my_size-1);
424                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
425                     reheap();
426                 }
427             }
428         }
429
430         // heapify any leftover pushed elements before doing the next
431         // batch of operations
432         if (mark<data.size()) heapify();
433         __TBB_ASSERT(mark == data.size(), NULL);
434     }
435
436     //! Merge unsorted elements into heap
437     void heapify() {
438         if (!mark && data.size()>0) mark = 1;
439         for (; mark<data.size(); ++mark) {
440             // for each unheapified element under size
441             size_type cur_pos = mark;
442             value_type to_place = move(data[mark]);
443             do { // push to_place up the heap
444                 size_type parent = (cur_pos-1)>>1;
445                 if (!compare(data[parent], to_place)) break;
446                 data[cur_pos] = move(data[parent]);
447                 cur_pos = parent;
448             } while( cur_pos );
449             data[cur_pos] = move(to_place);
450         }
451     }
452
453     //! Re-heapify after an extraction
454     /** Re-heapify by pushing last element down the heap from the root. */
455     void reheap() {
456         size_type cur_pos=0, child=1;
457
458         while (child < mark) {
459             size_type target = child;
460             if (child+1 < mark && compare(data[child], data[child+1]))
461                 ++target;
462             // target now has the higher priority child
463             if (compare(data[target], data[data.size()-1])) break;
464             data[cur_pos] = move(data[target]);
465             cur_pos = target;
466             child = (cur_pos<<1)+1;
467         }
468         if (cur_pos != data.size()-1)
469             data[cur_pos] = move(data[data.size()-1]);
470         data.pop_back();
471         if (mark > data.size()) mark = data.size();
472     }
473
474     void push_back_helper(const T& t, tbb::internal::true_type) {
475         data.push_back(t);
476     }
477
478     void push_back_helper(const T&, tbb::internal::false_type) {
479         __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
480     }
481 };
482
483 } // namespace interface5
484
485 using interface5::concurrent_priority_queue;
486
487 } // namespace tbb
488
489 #endif /* __TBB_concurrent_priority_queue_H */