X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=tbb%2Finclude%2Ftbb%2Fconcurrent_priority_queue.h;h=fe1839a779a9403321334c35fae5e3d8317cc3a0;hb=4c96b4064a92f65beead3cb2453d727187bed40f;hp=8a9f252b660309e35c287203620a01749e15da15;hpb=46ab0514ba58ee00183ff0584c7ea7c9e3d76494;p=casparcg diff --git a/tbb/include/tbb/concurrent_priority_queue.h b/tbb/include/tbb/concurrent_priority_queue.h index 8a9f252b6..fe1839a77 100644 --- a/tbb/include/tbb/concurrent_priority_queue.h +++ b/tbb/include/tbb/concurrent_priority_queue.h @@ -38,7 +38,7 @@ #include "tbb_exception.h" #include "tbb_stddef.h" #include "tbb_profiling.h" -#include "_aggregator_internal.h" +#include "internal/_aggregator_impl.h" #include #include #include @@ -71,64 +71,71 @@ class concurrent_priority_queue { typedef A allocator_type; //! Constructs a new concurrent_priority_queue with default capacity - explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), data(a) { + explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a) + { my_aggregator.initialize_handler(my_functor_t(this)); } //! Constructs a new concurrent_priority_queue with init_sz capacity - explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : mark(0), data(a) { + explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : + mark(0), my_size(0), data(a) + { data.reserve(init_capacity); my_aggregator.initialize_handler(my_functor_t(this)); } //! [begin,end) constructor template - concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : data(begin, end, a) + concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : + data(begin, end, a) { mark = 0; my_aggregator.initialize_handler(my_functor_t(this)); heapify(); + my_size = data.size(); } //! Copy constructor - /** State of this queue may not reflect results of pending - operations on the copied queue. */ - explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), data(src.data.begin(), src.data.end(), src.data.get_allocator()) + /** This operation is unsafe if there are pending concurrent operations on the src queue. */ + explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), + my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator()) { my_aggregator.initialize_handler(my_functor_t(this)); heapify(); } - concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), data(src.data.begin(), src.data.end(), a) + //! Copy constructor with specific allocator + /** This operation is unsafe if there are pending concurrent operations on the src queue. */ + concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), + my_size(src.my_size), data(src.data.begin(), src.data.end(), a) { my_aggregator.initialize_handler(my_functor_t(this)); heapify(); } //! Assignment operator - /** State of this queue may not reflect results of pending - operations on the copied queue. */ + /** This operation is unsafe if there are pending concurrent operations on the src queue. */ concurrent_priority_queue& operator=(const concurrent_priority_queue& src) { if (this != &src) { std::vector(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data); mark = src.mark; + my_size = src.my_size; } return *this; } //! Returns true if empty, false otherwise - /** Returned value may not reflect results of pending operations. */ - bool empty() const { return data.empty(); } + /** Returned value may not reflect results of pending operations. + This operation reads shared data and will trigger a race condition. */ + bool empty() const { return size()==0; } //! Returns the current number of elements contained in the queue - /** Returned value may not reflect results of pending operations. */ - size_type size() const { return data.size(); } - - //! Returns the current capacity (i.e. allocated storage) of the queue - /** Returned value may not reflect results of pending operations. */ - size_type capacity() const { return data.capacity(); } + /** Returned value may not reflect results of pending operations. + This operation reads shared data and will trigger a race condition. */ + size_type size() const { return __TBB_load_with_acquire(my_size); } //! Pushes elem onto the queue, increasing capacity of queue if necessary + /** This operation can be safely used concurrently with other push, try_pop or reserve operations. */ void push(const_reference elem) { cpq_operation op_data(elem, PUSH_OP); my_aggregator.execute(&op_data); @@ -138,7 +145,8 @@ class concurrent_priority_queue { //! Gets a reference to and removes highest priority element /** If a highest priority element was found, sets elem and returns true, - otherwise returns false. */ + otherwise returns false. + This operation can be safely used concurrently with other push, try_pop or reserve operations. */ bool try_pop(reference elem) { cpq_operation op_data(POP_OP); op_data.elem = &elem; @@ -146,39 +154,29 @@ class concurrent_priority_queue { return op_data.status==SUCCEEDED; } - //! If current capacity is less than new_cap, increases capacity to new_cap - void reserve(size_type new_cap) { - cpq_operation op_data(RESERVE_OP); - op_data.sz = new_cap; - my_aggregator.execute(&op_data); - if (op_data.status == FAILED) // exception thrown - throw_exception(eid_bad_alloc); - } - //! Clear the queue; not thread-safe - /** Resets size, effectively emptying queue; does not free space. + /** This operation is unsafe if there are pending concurrent operations on the queue. + Resets size, effectively emptying queue; does not free space. May not clear elements added in pending operations. */ void clear() { data.clear(); mark = 0; - } - - //! Shrink queue capacity to current contents; not thread-safe - void shrink_to_fit() { - std::vector(data.begin(), data.end(), data.get_allocator()).swap(data); + my_size = 0; } //! Swap this queue with another; not thread-safe + /** This operation is unsafe if there are pending concurrent operations on the queue. */ void swap(concurrent_priority_queue& q) { data.swap(q.data); std::swap(mark, q.mark); + std::swap(my_size, q.my_size); } //! Return allocator object allocator_type get_allocator() const { return data.get_allocator(); } private: - enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP}; + enum operation_type {INVALID_OP, PUSH_OP, POP_OP}; enum operation_status { WAIT=0, SUCCEEDED, FAILED }; class cpq_operation : public aggregated_operation { @@ -208,9 +206,10 @@ class concurrent_priority_queue { char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)]; //! The point at which unsorted elements begin size_type mark; + __TBB_atomic size_type my_size; Compare compare; //! Padding added to avoid false sharing - char padding2[NFS_MaxLineSize - sizeof(size_type) - sizeof(Compare)]; + char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)]; //! Storage for the heap of elements in queue, plus unheapified elements /** data has the following structure: @@ -222,14 +221,13 @@ class concurrent_priority_queue { [_|...|_|_|...|_| |...| ] 0 ^ ^ ^ | | |__capacity - | |__size + | |__my_size |__mark - Thus, data stores the binary heap starting at position 0 through - mark-1 (it may be empty). Then there are 0 or more elements - that have not yet been inserted into the heap, in positions - mark through size-1. */ + mark-1 (it may be empty). Then there are 0 or more elements + that have not yet been inserted into the heap, in positions + mark through my_size-1. */ std::vector data; void handle_operations(cpq_operation *op_list) { @@ -254,17 +252,20 @@ class concurrent_priority_queue { if (tmp->type == PUSH_OP) { __TBB_TRY { data.push_back(*(tmp->elem)); + __TBB_store_with_release(my_size, my_size+1); itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); } __TBB_CATCH(...) { itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); } } - else if (tmp->type == POP_OP) { + else { // tmp->type == POP_OP + __TBB_ASSERT(tmp->type == POP_OP, NULL); if (mark < data.size() && compare(data[0], data[data.size()-1])) { // there are newly pushed elems and the last one // is higher than top *(tmp->elem) = data[data.size()-1]; // copy the data + __TBB_store_with_release(my_size, my_size-1); itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); data.pop_back(); __TBB_ASSERT(mark<=data.size(), NULL); @@ -274,15 +275,6 @@ class concurrent_priority_queue { pop_list = tmp; } } - else { - __TBB_ASSERT(tmp->type == RESERVE_OP, NULL); - __TBB_TRY { - data.reserve(tmp->sz); - itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); - } __TBB_CATCH(...) { - itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); - } - } } // second pass processes pop operations @@ -300,11 +292,13 @@ class concurrent_priority_queue { // there are newly pushed elems and the last one is // higher than top *(tmp->elem) = data[data.size()-1]; // copy the data + __TBB_store_with_release(my_size, my_size-1); itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); data.pop_back(); } else { // extract top and push last element down heap *(tmp->elem) = data[0]; // copy the data + __TBB_store_with_release(my_size, my_size-1); itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); reheap(); } @@ -319,7 +313,7 @@ class concurrent_priority_queue { //! Merge unsorted elements into heap void heapify() { - if (!mark) mark = 1; + if (!mark && data.size()>0) mark = 1; for (; mark