2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
25 #include <initializer_list>
27 #include <tbb/concurrent_queue.h>
29 #include <boost/thread/mutex.hpp>
31 #include "semaphore.h"
36 * Blocking concurrent priority queue supporting a predefined set of
37 * priorities. FIFO ordering is guaranteed within a priority.
39 * Prio must have the < and > operators defined where a larger instance is of a
42 template <class T, class Prio>
43 class blocking_priority_queue
46 typedef unsigned int size_type;
48 std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
50 semaphore space_available_ = capacity_;
51 semaphore elements_available_ = 0u;
52 mutable boost::mutex capacity_mutex_;
57 * @param capacity The initial capacity of the queue.
58 * @param priorities A forward iterable range with the priorities to
61 template<class PrioList>
62 blocking_priority_queue(size_type capacity, const PrioList& priorities)
65 for (Prio priority : priorities)
67 queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
70 // The std::map is read-only from now on, so there *should* (it is
71 // unlikely but possible for a std::map implementor to choose to
72 // rebalance the tree during read operations) be no race conditions
75 // This may be true for vc10 as well:
76 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
80 * Push an element with a given priority to the queue. Blocks until room
83 * @param priority The priority of the element.
84 * @param element The element.
86 void push(Prio priority, const T& element)
88 acquire_transaction transaction(space_available_);
90 push_acquired(priority, element, transaction);
94 * Attempt to push an element with a given priority to the queue. Will
95 * immediately return even if there is no room in the queue.
97 * @param priority The priority of the element.
98 * @param element The element.
100 * @return true if the element was pushed. false if there was no room.
102 bool try_push(Prio priority, const T& element)
104 if (!space_available_.try_acquire())
107 acquire_transaction transaction(space_available_, true);
109 push_acquired(priority, element, transaction);
115 * Pop the element with the highest priority (fifo for elements with the
116 * same priority). Blocks until an element is available.
118 * @param element The element to store the result in.
122 acquire_transaction transaction(elements_available_);
124 pop_acquired_any_priority(element, transaction);
128 * Attempt to pop the element with the highest priority (fifo for elements
129 * with the same priority) if available. Does not wait until an element is
132 * @param element The element to store the result in.
134 * @return true if an element was available.
136 bool try_pop(T& element)
138 if (!elements_available_.try_acquire())
141 acquire_transaction transaction(elements_available_, true);
143 pop_acquired_any_priority(element, transaction);
149 * Attempt to pop the element with the highest priority (fifo for elements
150 * with the same priority) if available *and* has a minimum priority. Does
151 * not wait until an element satisfying the priority criteria is available.
153 * @param element The element to store the result in.
154 * @param minimum_priority The minimum priority to accept.
156 * @return true if an element was available with the minimum priority.
158 bool try_pop(T& element, Prio minimum_priority)
160 if (!elements_available_.try_acquire())
163 acquire_transaction transaction(elements_available_, true);
165 for (auto& queue : queues_by_priority_)
167 if (queue.first < minimum_priority)
169 // Will be true for all queues from this point so we break.
173 if (queue.second.try_pop(element))
175 transaction.commit();
176 space_available_.release();
186 * Modify the capacity of the queue. May block if reducing the capacity.
188 * @param capacity The new capacity.
190 void set_capacity(size_type capacity)
192 boost::unique_lock<boost::mutex> lock(capacity_mutex_);
194 if (capacity_ < capacity)
196 auto to_grow_with = capacity - capacity_;
197 space_available_.release(to_grow_with);
199 else if (capacity_ > capacity)
201 auto to_shrink_with = capacity_ - capacity;
202 // Will block until the desired capacity has been reached.
203 space_available_.acquire(to_shrink_with);
206 capacity_ = capacity;
210 * @return the current capacity of the queue.
212 size_type capacity() const
214 boost::unique_lock<boost::mutex> lock (capacity_mutex_);
220 * @return the current size of the queue (may have changed at the time of
223 size_type size() const
225 return elements_available_.permits();
229 * @return the current available space in the queue (may have changed at
230 * the time of returning).
232 size_type space_available() const
234 return space_available_.permits();
237 void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
241 queues_by_priority_.at(priority).push(element);
243 catch (std::out_of_range&)
245 throw std::runtime_error("Priority not supported by queue");
248 transaction.commit();
249 elements_available_.release();
252 void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
254 for (auto& queue : queues_by_priority_)
256 if (queue.second.try_pop(element))
258 transaction.commit();
259 space_available_.release();
265 throw std::logic_error(
266 "blocking_priority_queue should have contained at least one element but didn't");