2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
25 #include <initializer_list>
27 #include <tbb/concurrent_queue.h>
29 #include <boost/foreach.hpp>
30 #include <boost/thread/mutex.hpp>
32 #include "semaphore.h"
37 * Blocking concurrent priority queue supporting a predefined set of
38 * priorities. FIFO ordering is guaranteed within a priority.
40 * Prio must have the < and > operators defined where a larger instance is of a
43 template <class T, class Prio>
44 class blocking_priority_queue
47 typedef unsigned int size_type;
49 std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
50 semaphore space_available_;
51 semaphore elements_available_;
52 mutable boost::mutex capacity_mutex_;
58 * @param capacity The initial capacity of the queue.
59 * @param priorities A forward iterable container with the priorities to
62 template<class PrioList>
63 blocking_priority_queue(size_type capacity, const PrioList& priorities)
64 : space_available_(capacity)
65 , elements_available_(0u)
68 BOOST_FOREACH(Prio priority, priorities)
70 queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
73 // The std::map is read-only from now on, so there *should* (it is
74 // unlikely but possible for a std::map implementor to choose to
75 // rebalance the tree during read operations) be no race conditions
78 // This may be true for vc10 as well:
79 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
83 * Push an element with a given priority to the queue. Blocks until room
86 * @param priority The priority of the element.
87 * @param element The element.
89 void push(Prio priority, const T& element)
91 acquire_transaction transaction(space_available_);
93 push_acquired(priority, element, transaction);
97 * Attempt to push an element with a given priority to the queue. Will
98 * immediately return even if there is no room in the queue.
100 * @param priority The priority of the element.
101 * @param element The element.
103 * @return true if the element was pushed. false if there was no room.
105 bool try_push(Prio priority, const T& element)
107 if (!space_available_.try_acquire())
110 acquire_transaction transaction(space_available_, true);
112 push_acquired(priority, element, transaction);
118 * Pop the element with the highest priority (fifo for elements with the
119 * same priority). Blocks until an element is available.
121 * @param element The element to store the result in.
125 acquire_transaction transaction(elements_available_);
127 pop_acquired_any_priority(element, transaction);
131 * Attempt to pop the element with the highest priority (fifo for elements
132 * with the same priority) if available. Does not wait until an element is
135 * @param element The element to store the result in.
137 * @return true if an element was available.
139 bool try_pop(T& element)
141 if (!elements_available_.try_acquire())
144 acquire_transaction transaction(elements_available_, true);
146 pop_acquired_any_priority(element, transaction);
152 * Attempt to pop the element with the highest priority (fifo for elements
153 * with the same priority) if available *and* has a minimum priority. Does
154 * not wait until an element satisfying the priority criteria is available.
156 * @param element The element to store the result in.
157 * @param minimum_priority The minimum priority to accept.
159 * @return true if an element was available with the minimum priority.
161 bool try_pop(T& element, Prio minimum_priority)
163 if (!elements_available_.try_acquire())
166 acquire_transaction transaction(elements_available_, true);
168 BOOST_FOREACH(auto& queue, queues_by_priority_)
170 if (queue.first < minimum_priority)
172 // Will be true for all queues from this point so we break.
176 if (queue.second.try_pop(element))
178 transaction.commit();
179 space_available_.release();
189 * Modify the capacity of the queue. May block if reducing the capacity.
191 * @param capacity The new capacity.
193 void set_capacity(size_type capacity)
195 boost::mutex::scoped_lock lock (capacity_mutex_);
197 if (capacity_ < capacity)
199 auto to_grow_with = capacity - capacity_;
200 space_available_.release(to_grow_with);
202 else if (capacity_ > capacity)
204 auto to_shrink_with = capacity_ - capacity;
205 // Will block until the desired capacity has been reached.
206 space_available_.acquire(to_shrink_with);
209 capacity_ = capacity;
213 * @return the current capacity of the queue.
215 size_type capacity() const
217 boost::mutex::scoped_lock lock (capacity_mutex_);
223 * @return the current size of the queue (may have changed at the time of
226 size_type size() const
228 return elements_available_.permits();
231 void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
235 queues_by_priority_.at(priority).push(element);
237 catch (std::out_of_range&)
239 throw std::runtime_error("Priority not supported by queue");
242 transaction.commit();
243 elements_available_.release();
246 void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
248 BOOST_FOREACH(auto& queue, queues_by_priority_)
250 if (queue.second.try_pop(element))
252 transaction.commit();
253 space_available_.release();
259 throw std::logic_error(
260 "blocking_priority_queue should have contained at least one element but didn't");