/*
- Copyright 2005-2011 Intel Corporation. All Rights Reserved.
-
- This file is part of Threading Building Blocks.
-
- Threading Building Blocks is free software; you can redistribute it
- and/or modify it under the terms of the GNU General Public License
- version 2 as published by the Free Software Foundation.
-
- Threading Building Blocks is distributed in the hope that it will be
- useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with Threading Building Blocks; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-
- As a special exception, you may use this file as part of a free software
- library without restriction. Specifically, if other files instantiate
- templates or use macros or inline functions from this file, or you compile
- this file and link it with other files to produce an executable, this
- file does not by itself cause the resulting executable to be covered by
- the GNU General Public License. This exception does not however
- invalidate any other reasons why the executable file might be covered by
- the GNU General Public License.
+ Copyright 2005-2014 Intel Corporation. All Rights Reserved.
+
+ This file is part of Threading Building Blocks. Threading Building Blocks is free software;
+ you can redistribute it and/or modify it under the terms of the GNU General Public License
+ version 2 as published by the Free Software Foundation. Threading Building Blocks is
+ distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+ implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the GNU General Public License for more details. You should have received a copy of
+ the GNU General Public License along with Threading Building Blocks; if not, write to the
+ Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+ As a special exception, you may use this file as part of a free software library without
+ restriction. Specifically, if other files instantiate templates or use macros or inline
+ functions from this file, or you compile this file and link it with other files to produce
+ an executable, this file does not by itself cause the resulting executable to be covered
+ by the GNU General Public License. This exception does not however invalidate any other
+ reasons why the executable file might be covered by the GNU General Public License.
*/
#ifndef __TBB_concurrent_priority_queue_H
#include <iterator>
#include <functional>
+#if __TBB_INITIALIZER_LISTS_PRESENT
+ #include <initializer_list>
+#endif
+
+#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
+ #include <type_traits>
+#endif
+
namespace tbb {
namespace interface5 {
+namespace internal {
+#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
+ template<typename T, bool C = std::is_copy_constructible<T>::value>
+ struct use_element_copy_constructor {
+ typedef tbb::internal::true_type type;
+ };
+ template<typename T>
+ struct use_element_copy_constructor <T,false> {
+ typedef tbb::internal::false_type type;
+ };
+#else
+ template<typename>
+ struct use_element_copy_constructor {
+ typedef tbb::internal::true_type type;
+ };
+#endif
+} // namespace internal
using namespace tbb::internal;
//! [begin,end) constructor
template<typename InputIterator>
concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
- data(begin, end, a)
+ mark(0), data(begin, end, a)
+ {
+ my_aggregator.initialize_handler(my_functor_t(this));
+ heapify();
+ my_size = data.size();
+ }
+
+#if __TBB_INITIALIZER_LISTS_PRESENT
+ //! Constructor from std::initializer_list
+ concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
+ mark(0),data(init_list.begin(), init_list.end(), a)
{
- mark = 0;
my_aggregator.initialize_handler(my_functor_t(this));
heapify();
my_size = data.size();
}
+#endif //# __TBB_INITIALIZER_LISTS_PRESENT
//! Copy constructor
/** This operation is unsafe if there are pending concurrent operations on the src queue. */
/** This operation is unsafe if there are pending concurrent operations on the src queue. */
concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
if (this != &src) {
- std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
+ vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
+ mark = src.mark;
+ my_size = src.my_size;
+ }
+ return *this;
+ }
+
+#if __TBB_CPP11_RVALUE_REF_PRESENT
+ //! Move constructor
+ /** This operation is unsafe if there are pending concurrent operations on the src queue. */
+ concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
+ my_size(src.my_size), data(std::move(src.data))
+ {
+ my_aggregator.initialize_handler(my_functor_t(this));
+ }
+
+ //! Move constructor with specific allocator
+ /** This operation is unsafe if there are pending concurrent operations on the src queue. */
+ concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
+ my_size(src.my_size),
+#if __TBB_ALLOCATOR_TRAITS_PRESENT
+ data(std::move(src.data), a)
+#else
+ // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
+ // It seems that the reason is absence of support of allocator_traits (stateful allocators).
+ data(a)
+#endif //__TBB_ALLOCATOR_TRAITS_PRESENT
+ {
+ my_aggregator.initialize_handler(my_functor_t(this));
+#if !__TBB_ALLOCATOR_TRAITS_PRESENT
+ if (a != src.data.get_allocator()){
+ data.reserve(src.data.size());
+ data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
+ }else{
+ data = std::move(src.data);
+ }
+#endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
+ }
+
+ //! Move assignment operator
+ /** This operation is unsafe if there are pending concurrent operations on the src queue. */
+ concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
+ if (this != &src) {
mark = src.mark;
my_size = src.my_size;
+#if !__TBB_ALLOCATOR_TRAITS_PRESENT
+ if (data.get_allocator() != src.data.get_allocator()){
+ vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
+ }else
+#endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
+ {
+ data = std::move(src.data);
+ }
}
return *this;
}
+#endif //__TBB_CPP11_RVALUE_REF_PRESENT
+
+ //! Assign the queue from [begin,end) range, not thread-safe
+ template<typename InputIterator>
+ void assign(InputIterator begin, InputIterator end) {
+ vector_t(begin, end, data.get_allocator()).swap(data);
+ mark = 0;
+ my_size = data.size();
+ heapify();
+ }
+
+#if __TBB_INITIALIZER_LISTS_PRESENT
+ //! Assign the queue from std::initializer_list, not thread-safe
+ void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
+
+ //! Assign from std::initializer_list, not thread-safe
+ concurrent_priority_queue& operator=(std::initializer_list<T> il) {
+ this->assign(il.begin(), il.end());
+ return *this;
+ }
+#endif //# __TBB_INITIALIZER_LISTS_PRESENT
//! Returns true if empty, false otherwise
/** Returned value may not reflect results of pending operations.
size_type size() const { return __TBB_load_with_acquire(my_size); }
//! Pushes elem onto the queue, increasing capacity of queue if necessary
- /** This operation can be safely used concurrently with other push, try_pop or reserve operations. */
+ /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
void push(const_reference elem) {
+#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
+ __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
+#endif
cpq_operation op_data(elem, PUSH_OP);
my_aggregator.execute(&op_data);
if (op_data.status == FAILED) // exception thrown
throw_exception(eid_bad_alloc);
}
+#if __TBB_CPP11_RVALUE_REF_PRESENT
+ //! Pushes elem onto the queue, increasing capacity of queue if necessary
+ /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
+ void push(value_type &&elem) {
+ cpq_operation op_data(elem, PUSH_RVALUE_OP);
+ my_aggregator.execute(&op_data);
+ if (op_data.status == FAILED) // exception thrown
+ throw_exception(eid_bad_alloc);
+ }
+
+#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
+ //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
+ /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
+ template<typename... Args>
+ void emplace(Args&&... args) {
+ push(value_type(std::forward<Args>(args)...));
+ }
+#endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
+#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
+
//! Gets a reference to and removes highest priority element
/** If a highest priority element was found, sets elem and returns true,
otherwise returns false.
- This operation can be safely used concurrently with other push, try_pop or reserve operations. */
+ This operation can be safely used concurrently with other push, try_pop or emplace operations. */
bool try_pop(reference elem) {
cpq_operation op_data(POP_OP);
op_data.elem = &elem;
//! Swap this queue with another; not thread-safe
/** This operation is unsafe if there are pending concurrent operations on the queue. */
void swap(concurrent_priority_queue& q) {
+ using std::swap;
data.swap(q.data);
- std::swap(mark, q.mark);
- std::swap(my_size, q.my_size);
+ swap(mark, q.mark);
+ swap(my_size, q.my_size);
}
//! Return allocator object
allocator_type get_allocator() const { return data.get_allocator(); }
private:
- enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
+ enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
enum operation_status { WAIT=0, SUCCEEDED, FAILED };
class cpq_operation : public aggregated_operation<cpq_operation> {
}
};
- aggregator< my_functor_t, cpq_operation> my_aggregator;
+ typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
+ aggregator_t my_aggregator;
//! Padding added to avoid false sharing
- char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
+ char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
//! The point at which unsorted elements begin
size_type mark;
__TBB_atomic size_type my_size;
mark-1 (it may be empty). Then there are 0 or more elements
that have not yet been inserted into the heap, in positions
mark through my_size-1. */
- std::vector<value_type, allocator_type> data;
+ typedef std::vector<value_type, allocator_type> vector_t;
+ vector_t data;
void handle_operations(cpq_operation *op_list) {
cpq_operation *tmp, *pop_list=NULL;
__TBB_ASSERT(mark == data.size(), NULL);
- // first pass processes all constant time operations: pushes,
- // tops, some pops. Also reserve.
+ // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
while (op_list) {
// ITT note: &(op_list->status) tag is used to cover accesses to op_list
// node. This thread is going to handle the operation, and so will acquire it
__TBB_ASSERT(op_list->type != INVALID_OP, NULL);
tmp = op_list;
op_list = itt_hide_load_word(op_list->next);
- if (tmp->type == PUSH_OP) {
- __TBB_TRY {
- data.push_back(*(tmp->elem));
- __TBB_store_with_release(my_size, my_size+1);
- itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
- } __TBB_CATCH(...) {
- itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
- }
- }
- else { // tmp->type == POP_OP
- __TBB_ASSERT(tmp->type == POP_OP, NULL);
+ if (tmp->type == POP_OP) {
if (mark < data.size() &&
compare(data[0], data[data.size()-1])) {
// there are newly pushed elems and the last one
// is higher than top
- *(tmp->elem) = data[data.size()-1]; // copy the data
+ *(tmp->elem) = move(data[data.size()-1]);
__TBB_store_with_release(my_size, my_size-1);
itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
data.pop_back();
itt_hide_store_word(tmp->next, pop_list);
pop_list = tmp;
}
+ } else { // PUSH_OP or PUSH_RVALUE_OP
+ __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
+ __TBB_TRY{
+ if (tmp->type == PUSH_OP) {
+ push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
+ } else {
+ data.push_back(move(*(tmp->elem)));
+ }
+ __TBB_store_with_release(my_size, my_size + 1);
+ itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
+ } __TBB_CATCH(...) {
+ itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
+ }
}
}
compare(data[0], data[data.size()-1])) {
// there are newly pushed elems and the last one is
// higher than top
- *(tmp->elem) = data[data.size()-1]; // copy the data
+ *(tmp->elem) = move(data[data.size()-1]);
__TBB_store_with_release(my_size, my_size-1);
itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
data.pop_back();
}
else { // extract top and push last element down heap
- *(tmp->elem) = data[0]; // copy the data
+ *(tmp->elem) = move(data[0]);
__TBB_store_with_release(my_size, my_size-1);
itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
reheap();
for (; mark<data.size(); ++mark) {
// for each unheapified element under size
size_type cur_pos = mark;
- value_type to_place = data[mark];
+ value_type to_place = move(data[mark]);
do { // push to_place up the heap
size_type parent = (cur_pos-1)>>1;
if (!compare(data[parent], to_place)) break;
- data[cur_pos] = data[parent];
+ data[cur_pos] = move(data[parent]);
cur_pos = parent;
} while( cur_pos );
- data[cur_pos] = to_place;
+ data[cur_pos] = move(to_place);
}
}
++target;
// target now has the higher priority child
if (compare(data[target], data[data.size()-1])) break;
- data[cur_pos] = data[target];
+ data[cur_pos] = move(data[target]);
cur_pos = target;
child = (cur_pos<<1)+1;
}
- data[cur_pos] = data[data.size()-1];
+ if (cur_pos != data.size()-1)
+ data[cur_pos] = move(data[data.size()-1]);
data.pop_back();
if (mark > data.size()) mark = data.size();
}
+
+ void push_back_helper(const T& t, tbb::internal::true_type) {
+ data.push_back(t);
+ }
+
+ void push_back_helper(const T&, tbb::internal::false_type) {
+ __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
+ }
};
} // namespace interface5