2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
25 #include <initializer_list>
27 #include <tbb/concurrent_queue.h>
29 #include <boost/foreach.hpp>
30 #include <boost/thread/mutex.hpp>
32 #include "semaphore.h"
37 * Blocking concurrent priority queue supporting a predefined set of
38 * priorities. FIFO ordering is guaranteed within a priority.
40 * Prio must have the < and > operators defined where a larger instance is of a
43 template <class T, class Prio>
44 class blocking_priority_queue
47 typedef unsigned int size_type;
49 std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
50 semaphore space_available_;
51 semaphore elements_available_;
52 mutable boost::mutex capacity_mutex_;
58 * @param capacity The initial capacity of the queue.
59 * @param priorities A forward iterable container with the priorities to
62 template<class PrioList>
63 blocking_priority_queue(size_type capacity, const PrioList& priorities)
64 : space_available_(capacity)
65 , elements_available_(0u)
68 BOOST_FOREACH(Prio priority, priorities)
70 queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
73 // The std::map is read-only from now on, so there *should* (it is
74 // unlikely but possible for a std::map implementor to choose to
75 // rebalance the tree during read operations) be no race conditions
78 // This may be true for vc10 as well:
79 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
83 * Push an element with a given priority to the queue. Blocks until room
86 * @param priority The priority of the element.
87 * @param element The element.
89 void push(Prio priority, const T& element)
91 acquire_transaction transaction(space_available_);
93 push_acquired(priority, element, transaction);
97 * Attempt to push an element with a given priority to the queue. Will
98 * immediately return even if there is no room in the queue.
100 * @param priority The priority of the element.
101 * @param element The element.
103 * @return true if the element was pushed. false if there was no room.
105 bool try_push(Prio priority, const T& element)
107 if (!space_available_.try_acquire())
110 acquire_transaction transaction(space_available_, true);
112 push_acquired(priority, element, transaction);
118 * Pop the element with the highest priority (fifo for elements with the
119 * same priority). Blocks until an element is available.
121 * @param element The element to store the result in.
125 acquire_transaction transaction(elements_available_);
128 pop_acquired_any_priority(element, transaction);
132 * Attempt to pop the element with the highest priority (fifo for elements
133 * with the same priority) if available. Does not wait until an element is
136 * @param element The element to store the result in.
138 * @return true if an element was available.
140 bool try_pop(T& element)
142 if (!elements_available_.try_acquire())
145 acquire_transaction transaction(elements_available_, true);
147 pop_acquired_any_priority(element, transaction);
153 * Attempt to pop the element with the highest priority (fifo for elements
154 * with the same priority) if available *and* has a minimum priority. Does
155 * not wait until an element satisfying the priority criteria is available.
157 * @param element The element to store the result in.
158 * @param minimum_priority The minimum priority to accept.
160 * @return true if an element was available with the minimum priority.
162 bool try_pop(T& element, Prio minimum_priority)
164 if (!elements_available_.try_acquire())
167 acquire_transaction transaction(elements_available_, true);
169 BOOST_FOREACH(auto& queue, queues_by_priority_)
171 if (queue.first < minimum_priority)
173 // Will be true for all queues from this point so we break.
177 if (queue.second.try_pop(element))
179 transaction.commit();
180 space_available_.release();
190 * Modify the capacity of the queue. May block if reducing the capacity.
192 * @param capacity The new capacity.
194 void set_capacity(size_type capacity)
196 boost::mutex::scoped_lock lock (capacity_mutex_);
198 if (capacity_ < capacity)
200 auto to_grow_with = capacity - capacity_;
201 space_available_.release(to_grow_with);
203 else if (capacity_ > capacity)
205 auto to_shrink_with = capacity_ - capacity;
206 // Will block until the desired capacity has been reached.
207 space_available_.acquire(to_shrink_with);
210 capacity_ = capacity;
214 * @return the current capacity of the queue.
216 size_type capacity() const
218 boost::mutex::scoped_lock lock (capacity_mutex_);
224 * @return the current size of the queue (may have changed at the time of
227 size_type size() const
229 return elements_available_.permits();
232 void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
236 queues_by_priority_.at(priority).push(element);
238 catch (std::out_of_range&)
240 throw std::runtime_error("Priority not supported by queue");
243 transaction.commit();
244 elements_available_.release();
247 void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
249 BOOST_FOREACH(auto& queue, queues_by_priority_)
251 if (queue.second.try_pop(element))
253 transaction.commit();
254 space_available_.release();
260 throw std::logic_error(
261 "blocking_priority_queue should have contained at least one element but didn't");