2 Copyright 2005-2014 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5 you can redistribute it and/or modify it under the terms of the GNU General Public License
6 version 2 as published by the Free Software Foundation. Threading Building Blocks is
7 distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9 See the GNU General Public License for more details. You should have received a copy of
10 the GNU General Public License along with Threading Building Blocks; if not, write to the
11 Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
13 As a special exception, you may use this file as part of a free software library without
14 restriction. Specifically, if other files instantiate templates or use macros or inline
15 functions from this file, or you compile this file and link it with other files to produce
16 an executable, this file does not by itself cause the resulting executable to be covered
17 by the GNU General Public License. This exception does not however invalidate any other
18 reasons why the executable file might be covered by the GNU General Public License.
21 #ifndef __TBB_concurrent_priority_queue_H
22 #define __TBB_concurrent_priority_queue_H
25 #include "cache_aligned_allocator.h"
26 #include "tbb_exception.h"
27 #include "tbb_stddef.h"
28 #include "tbb_profiling.h"
29 #include "internal/_aggregator_impl.h"
34 #if __TBB_INITIALIZER_LISTS_PRESENT
35 #include <initializer_list>
38 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
39 #include <type_traits>
43 namespace interface5 {
45 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
46 template<typename T, bool C = std::is_copy_constructible<T>::value>
47 struct use_element_copy_constructor {
48 typedef tbb::internal::true_type type;
51 struct use_element_copy_constructor <T,false> {
52 typedef tbb::internal::false_type type;
56 struct use_element_copy_constructor {
57 typedef tbb::internal::true_type type;
60 } // namespace internal
62 using namespace tbb::internal;
64 //! Concurrent priority queue
65 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
66 class concurrent_priority_queue {
68 //! Element type in the queue.
74 //! Const reference type
75 typedef const T& const_reference;
77 //! Integral type for representing size of the queue.
78 typedef size_t size_type;
80 //! Difference type for iterator
81 typedef ptrdiff_t difference_type;
84 typedef A allocator_type;
86 //! Constructs a new concurrent_priority_queue with default capacity
87 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
89 my_aggregator.initialize_handler(my_functor_t(this));
92 //! Constructs a new concurrent_priority_queue with init_sz capacity
93 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
94 mark(0), my_size(0), data(a)
96 data.reserve(init_capacity);
97 my_aggregator.initialize_handler(my_functor_t(this));
100 //! [begin,end) constructor
101 template<typename InputIterator>
102 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
103 mark(0), data(begin, end, a)
105 my_aggregator.initialize_handler(my_functor_t(this));
107 my_size = data.size();
110 #if __TBB_INITIALIZER_LISTS_PRESENT
111 //! Constructor from std::initializer_list
112 concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
113 mark(0),data(init_list.begin(), init_list.end(), a)
115 my_aggregator.initialize_handler(my_functor_t(this));
117 my_size = data.size();
119 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
122 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
123 explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
124 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
126 my_aggregator.initialize_handler(my_functor_t(this));
130 //! Copy constructor with specific allocator
131 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
132 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
133 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
135 my_aggregator.initialize_handler(my_functor_t(this));
139 //! Assignment operator
140 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
141 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
143 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
145 my_size = src.my_size;
150 #if __TBB_CPP11_RVALUE_REF_PRESENT
152 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
153 concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
154 my_size(src.my_size), data(std::move(src.data))
156 my_aggregator.initialize_handler(my_functor_t(this));
159 //! Move constructor with specific allocator
160 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
161 concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
162 my_size(src.my_size),
163 #if __TBB_ALLOCATOR_TRAITS_PRESENT
164 data(std::move(src.data), a)
166 // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
167 // It seems that the reason is absence of support of allocator_traits (stateful allocators).
169 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
171 my_aggregator.initialize_handler(my_functor_t(this));
172 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
173 if (a != src.data.get_allocator()){
174 data.reserve(src.data.size());
175 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
177 data = std::move(src.data);
179 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
182 //! Move assignment operator
183 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
184 concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
187 my_size = src.my_size;
188 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
189 if (data.get_allocator() != src.data.get_allocator()){
190 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
192 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
194 data = std::move(src.data);
199 #endif //__TBB_CPP11_RVALUE_REF_PRESENT
201 //! Assign the queue from [begin,end) range, not thread-safe
202 template<typename InputIterator>
203 void assign(InputIterator begin, InputIterator end) {
204 vector_t(begin, end, data.get_allocator()).swap(data);
206 my_size = data.size();
210 #if __TBB_INITIALIZER_LISTS_PRESENT
211 //! Assign the queue from std::initializer_list, not thread-safe
212 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
214 //! Assign from std::initializer_list, not thread-safe
215 concurrent_priority_queue& operator=(std::initializer_list<T> il) {
216 this->assign(il.begin(), il.end());
219 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
221 //! Returns true if empty, false otherwise
222 /** Returned value may not reflect results of pending operations.
223 This operation reads shared data and will trigger a race condition. */
224 bool empty() const { return size()==0; }
226 //! Returns the current number of elements contained in the queue
227 /** Returned value may not reflect results of pending operations.
228 This operation reads shared data and will trigger a race condition. */
229 size_type size() const { return __TBB_load_with_acquire(my_size); }
231 //! Pushes elem onto the queue, increasing capacity of queue if necessary
232 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
233 void push(const_reference elem) {
234 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
235 __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
237 cpq_operation op_data(elem, PUSH_OP);
238 my_aggregator.execute(&op_data);
239 if (op_data.status == FAILED) // exception thrown
240 throw_exception(eid_bad_alloc);
243 #if __TBB_CPP11_RVALUE_REF_PRESENT
244 //! Pushes elem onto the queue, increasing capacity of queue if necessary
245 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
246 void push(value_type &&elem) {
247 cpq_operation op_data(elem, PUSH_RVALUE_OP);
248 my_aggregator.execute(&op_data);
249 if (op_data.status == FAILED) // exception thrown
250 throw_exception(eid_bad_alloc);
253 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
254 //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
255 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
256 template<typename... Args>
257 void emplace(Args&&... args) {
258 push(value_type(std::forward<Args>(args)...));
260 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
261 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
263 //! Gets a reference to and removes highest priority element
264 /** If a highest priority element was found, sets elem and returns true,
265 otherwise returns false.
266 This operation can be safely used concurrently with other push, try_pop or emplace operations. */
267 bool try_pop(reference elem) {
268 cpq_operation op_data(POP_OP);
269 op_data.elem = &elem;
270 my_aggregator.execute(&op_data);
271 return op_data.status==SUCCEEDED;
274 //! Clear the queue; not thread-safe
275 /** This operation is unsafe if there are pending concurrent operations on the queue.
276 Resets size, effectively emptying queue; does not free space.
277 May not clear elements added in pending operations. */
284 //! Swap this queue with another; not thread-safe
285 /** This operation is unsafe if there are pending concurrent operations on the queue. */
286 void swap(concurrent_priority_queue& q) {
290 swap(my_size, q.my_size);
293 //! Return allocator object
294 allocator_type get_allocator() const { return data.get_allocator(); }
297 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
298 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
300 class cpq_operation : public aggregated_operation<cpq_operation> {
307 cpq_operation(const_reference e, operation_type t) :
308 type(t), elem(const_cast<value_type*>(&e)) {}
309 cpq_operation(operation_type t) : type(t) {}
313 concurrent_priority_queue<T, Compare, A> *cpq;
316 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
317 void operator()(cpq_operation* op_list) {
318 cpq->handle_operations(op_list);
322 typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
323 aggregator_t my_aggregator;
324 //! Padding added to avoid false sharing
325 char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
326 //! The point at which unsorted elements begin
328 __TBB_atomic size_type my_size;
330 //! Padding added to avoid false sharing
331 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
332 //! Storage for the heap of elements in queue, plus unheapified elements
333 /** data has the following structure:
340 [_|...|_|_|...|_| |...| ]
346 Thus, data stores the binary heap starting at position 0 through
347 mark-1 (it may be empty). Then there are 0 or more elements
348 that have not yet been inserted into the heap, in positions
349 mark through my_size-1. */
350 typedef std::vector<value_type, allocator_type> vector_t;
353 void handle_operations(cpq_operation *op_list) {
354 cpq_operation *tmp, *pop_list=NULL;
356 __TBB_ASSERT(mark == data.size(), NULL);
358 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
360 // ITT note: &(op_list->status) tag is used to cover accesses to op_list
361 // node. This thread is going to handle the operation, and so will acquire it
362 // and perform the associated operation w/o triggering a race condition; the
363 // thread that created the operation is waiting on the status field, so when
364 // this thread is done with the operation, it will perform a
365 // store_with_release to give control back to the waiting thread in
366 // aggregator::insert_operation.
367 call_itt_notify(acquired, &(op_list->status));
368 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
370 op_list = itt_hide_load_word(op_list->next);
371 if (tmp->type == POP_OP) {
372 if (mark < data.size() &&
373 compare(data[0], data[data.size()-1])) {
374 // there are newly pushed elems and the last one
375 // is higher than top
376 *(tmp->elem) = move(data[data.size()-1]);
377 __TBB_store_with_release(my_size, my_size-1);
378 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
380 __TBB_ASSERT(mark<=data.size(), NULL);
382 else { // no convenient item to pop; postpone
383 itt_hide_store_word(tmp->next, pop_list);
386 } else { // PUSH_OP or PUSH_RVALUE_OP
387 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
389 if (tmp->type == PUSH_OP) {
390 push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
392 data.push_back(move(*(tmp->elem)));
394 __TBB_store_with_release(my_size, my_size + 1);
395 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
397 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
402 // second pass processes pop operations
405 pop_list = itt_hide_load_word(pop_list->next);
406 __TBB_ASSERT(tmp->type == POP_OP, NULL);
408 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
411 __TBB_ASSERT(mark<=data.size(), NULL);
412 if (mark < data.size() &&
413 compare(data[0], data[data.size()-1])) {
414 // there are newly pushed elems and the last one is
416 *(tmp->elem) = move(data[data.size()-1]);
417 __TBB_store_with_release(my_size, my_size-1);
418 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
421 else { // extract top and push last element down heap
422 *(tmp->elem) = move(data[0]);
423 __TBB_store_with_release(my_size, my_size-1);
424 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
430 // heapify any leftover pushed elements before doing the next
431 // batch of operations
432 if (mark<data.size()) heapify();
433 __TBB_ASSERT(mark == data.size(), NULL);
436 //! Merge unsorted elements into heap
438 if (!mark && data.size()>0) mark = 1;
439 for (; mark<data.size(); ++mark) {
440 // for each unheapified element under size
441 size_type cur_pos = mark;
442 value_type to_place = move(data[mark]);
443 do { // push to_place up the heap
444 size_type parent = (cur_pos-1)>>1;
445 if (!compare(data[parent], to_place)) break;
446 data[cur_pos] = move(data[parent]);
449 data[cur_pos] = move(to_place);
453 //! Re-heapify after an extraction
454 /** Re-heapify by pushing last element down the heap from the root. */
456 size_type cur_pos=0, child=1;
458 while (child < mark) {
459 size_type target = child;
460 if (child+1 < mark && compare(data[child], data[child+1]))
462 // target now has the higher priority child
463 if (compare(data[target], data[data.size()-1])) break;
464 data[cur_pos] = move(data[target]);
466 child = (cur_pos<<1)+1;
468 if (cur_pos != data.size()-1)
469 data[cur_pos] = move(data[data.size()-1]);
471 if (mark > data.size()) mark = data.size();
474 void push_back_helper(const T& t, tbb::internal::true_type) {
478 void push_back_helper(const T&, tbb::internal::false_type) {
479 __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
483 } // namespace interface5
485 using interface5::concurrent_priority_queue;
489 #endif /* __TBB_concurrent_priority_queue_H */