2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
25 #include <initializer_list>
27 #include <tbb/concurrent_queue.h>
29 #include "semaphore.h"
34 * Blocking concurrent priority queue supporting a predefined set of
35 * priorities. FIFO ordering is guaranteed within a priority.
37 * Prio must have the < and > operators defined where a larger instance is of a
40 template <class T, class Prio>
41 class blocking_priority_queue
44 typedef unsigned int size_type;
46 std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
48 semaphore space_available_ { capacity_ };
49 semaphore elements_available_ { 0u };
50 mutable std::mutex capacity_mutex_;
55 * @param capacity The initial capacity of the queue.
56 * @param priorities A forward iterable range with the priorities to
59 template<class PrioList>
60 blocking_priority_queue(size_type capacity, const PrioList& priorities)
63 for (Prio priority : priorities)
65 queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
68 // The std::map is read-only from now on, so there *should* (it is
69 // unlikely but possible for a std::map implementor to choose to
70 // rebalance the tree during read operations) be no race conditions
73 // This may be true for vc10 as well:
74 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
78 * Push an element with a given priority to the queue. Blocks until room
81 * @param priority The priority of the element.
82 * @param element The element.
84 void push(Prio priority, const T& element)
86 acquire_transaction transaction(space_available_);
88 push_acquired(priority, element, transaction);
92 * Attempt to push an element with a given priority to the queue. Will
93 * immediately return even if there is no room in the queue.
95 * @param priority The priority of the element.
96 * @param element The element.
98 * @return true if the element was pushed. false if there was no room.
100 bool try_push(Prio priority, const T& element)
102 if (!space_available_.try_acquire())
105 acquire_transaction transaction(space_available_, true);
107 push_acquired(priority, element, transaction);
113 * Pop the element with the highest priority (fifo for elements with the
114 * same priority). Blocks until an element is available.
116 * @param element The element to store the result in.
120 acquire_transaction transaction(elements_available_);
122 pop_acquired_any_priority(element, transaction);
126 * Attempt to pop the element with the highest priority (fifo for elements
127 * with the same priority) if available. Does not wait until an element is
130 * @param element The element to store the result in.
132 * @return true if an element was available.
134 bool try_pop(T& element)
136 if (!elements_available_.try_acquire())
139 acquire_transaction transaction(elements_available_, true);
141 pop_acquired_any_priority(element, transaction);
147 * Attempt to pop the element with the highest priority (fifo for elements
148 * with the same priority) if available *and* has a minimum priority. Does
149 * not wait until an element satisfying the priority criteria is available.
151 * @param element The element to store the result in.
152 * @param minimum_priority The minimum priority to accept.
154 * @return true if an element was available with the minimum priority.
156 bool try_pop(T& element, Prio minimum_priority)
158 if (!elements_available_.try_acquire())
161 acquire_transaction transaction(elements_available_, true);
163 for (auto& queue : queues_by_priority_)
165 if (queue.first < minimum_priority)
167 // Will be true for all queues from this point so we break.
171 if (queue.second.try_pop(element))
173 transaction.commit();
174 space_available_.release();
184 * Modify the capacity of the queue. May block if reducing the capacity.
186 * @param capacity The new capacity.
188 void set_capacity(size_type capacity)
190 std::unique_lock<std::mutex> lock(capacity_mutex_);
192 if (capacity_ < capacity)
194 auto to_grow_with = capacity - capacity_;
195 space_available_.release(to_grow_with);
197 else if (capacity_ > capacity)
199 auto to_shrink_with = capacity_ - capacity;
200 // Will block until the desired capacity has been reached.
201 space_available_.acquire(to_shrink_with);
204 capacity_ = capacity;
208 * @return the current capacity of the queue.
210 size_type capacity() const
212 std::unique_lock<std::mutex> lock (capacity_mutex_);
218 * @return the current size of the queue (may have changed at the time of
221 size_type size() const
223 return elements_available_.permits();
227 * @return the current available space in the queue (may have changed at
228 * the time of returning).
230 size_type space_available() const
232 return space_available_.permits();
235 void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
239 queues_by_priority_.at(priority).push(element);
241 catch (std::out_of_range&)
243 throw std::runtime_error("Priority not supported by queue");
246 transaction.commit();
247 elements_available_.release();
250 void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
252 for (auto& queue : queues_by_priority_)
254 if (queue.second.try_pop(element))
256 transaction.commit();
257 space_available_.release();
263 throw std::logic_error(
264 "blocking_priority_queue should have contained at least one element but didn't");