]> git.sesse.net Git - casparcg/blob - common/blocking_priority_queue.h
101ae31b0954e29ff78f454ea19cadbdfc885949
[casparcg] / common / blocking_priority_queue.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21 #pragma once
22
23 #include <stdexcept>
24 #include <map>
25 #include <initializer_list>
26
27 #include <tbb/concurrent_queue.h>
28
29 #include <boost/foreach.hpp>
30 #include <boost/thread/mutex.hpp>
31
32 #include "semaphore.h"
33
34 namespace caspar {
35
36 /**
37  * Blocking concurrent priority queue supporting a predefined set of
38  * priorities. FIFO ordering is guaranteed within a priority.
39  *
40  * Prio must have the < and > operators defined where a larger instance is of a
41  * higher priority.
42  */
43 template <class T, class Prio>
44 class blocking_priority_queue
45 {
46 public:
47         typedef unsigned int size_type;
48 private:        
49         std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
50         semaphore space_available_;
51         semaphore elements_available_;
52         mutable boost::mutex capacity_mutex_;
53         size_type capacity_;
54 public:
55         /**
56          * Constructor.
57          *
58          * @param capacity   The initial capacity of the queue.
59          * @param priorities A forward iterable container with the priorities to 
60          *                   support.
61          */
62         template<class PrioList>
63         blocking_priority_queue(size_type capacity, const PrioList& priorities)
64                 : space_available_(capacity)
65                 , elements_available_(0u)
66                 , capacity_(capacity)
67         {
68                 BOOST_FOREACH(Prio priority, priorities)
69                 {
70                         queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
71                 }
72
73                 // The std::map is read-only from now on, so there *should* (it is 
74                 // unlikely but possible for a std::map implementor to choose to 
75                 // rebalance the tree during read operations) be no race conditions
76                 // regarding the map.
77                 //
78                 // This may be true for vc10 as well:
79                 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
80         }
81
82         /**
83          * Push an element with a given priority to the queue. Blocks until room
84          * is available.
85          *
86          * @param priority The priority of the element.
87          * @param element  The element.
88          */
89         void push(Prio priority, const T& element)
90         {
91                 acquire_transaction transaction(space_available_);
92
93                 push_acquired(priority, element, transaction);
94         }
95
96         /**
97          * Attempt to push an element with a given priority to the queue. Will
98          * immediately return even if there is no room in the queue.
99          *
100          * @param priority The priority of the element.
101          * @param element  The element.
102          *
103          * @return true if the element was pushed. false if there was no room.
104          */
105         bool try_push(Prio priority, const T& element)
106         {
107                 if (!space_available_.try_acquire())
108                         return false;
109
110                 acquire_transaction transaction(space_available_, true);
111
112                 push_acquired(priority, element, transaction);
113
114                 return true;
115         }
116
117         /**
118          * Pop the element with the highest priority (fifo for elements with the
119          * same priority). Blocks until an element is available.
120          *
121          * @param element The element to store the result in.
122          */
123         void pop(T& element)
124         {
125                 acquire_transaction transaction(elements_available_);
126
127
128                 pop_acquired_any_priority(element, transaction);
129         }
130
131         /**
132          * Attempt to pop the element with the highest priority (fifo for elements
133          * with the same priority) if available. Does not wait until an element is
134          * available.
135          *
136          * @param element The element to store the result in.
137          *
138          * @return true if an element was available.
139          */
140         bool try_pop(T& element)
141         {
142                 if (!elements_available_.try_acquire())
143                         return false;
144
145                 acquire_transaction transaction(elements_available_, true);
146
147                 pop_acquired_any_priority(element, transaction);
148
149                 return true;
150         }
151
152         /**
153          * Attempt to pop the element with the highest priority (fifo for elements
154          * with the same priority) if available *and* has a minimum priority. Does
155          * not wait until an element satisfying the priority criteria is available.
156          *
157          * @param element          The element to store the result in.
158          * @param minimum_priority The minimum priority to accept.
159          *
160          * @return true if an element was available with the minimum priority.
161          */
162         bool try_pop(T& element, Prio minimum_priority)
163         {
164                 if (!elements_available_.try_acquire())
165                         return false;
166
167                 acquire_transaction transaction(elements_available_, true);
168
169                 BOOST_FOREACH(auto& queue, queues_by_priority_)
170                 {
171                         if (queue.first < minimum_priority)
172                         {
173                                 // Will be true for all queues from this point so we break.
174                                 break;
175                         }
176
177                         if (queue.second.try_pop(element))
178                         {
179                                 transaction.commit();
180                                 space_available_.release();
181
182                                 return true;
183                         }
184                 }
185
186                 return false;
187         }
188
189         /**
190          * Modify the capacity of the queue. May block if reducing the capacity.
191          *
192          * @param capacity The new capacity.
193          */
194         void set_capacity(size_type capacity)
195         {
196                 boost::mutex::scoped_lock lock (capacity_mutex_);
197
198                 if (capacity_ < capacity)
199                 {
200                         auto to_grow_with = capacity - capacity_;
201                         space_available_.release(to_grow_with);
202                 }
203                 else if (capacity_ > capacity)
204                 {
205                         auto to_shrink_with = capacity_ - capacity;
206                         // Will block until the desired capacity has been reached.
207                         space_available_.acquire(to_shrink_with);
208                 }
209
210                 capacity_ = capacity;
211         }
212
213         /**
214          * @return the current capacity of the queue.
215          */
216         size_type capacity() const
217         {
218                 boost::mutex::scoped_lock lock (capacity_mutex_);
219
220                 return capacity_;
221         }
222
223         /**
224          * @return the current size of the queue (may have changed at the time of
225          *         returning).
226          */
227         size_type size() const
228         {
229                 return elements_available_.permits();
230         }
231 private:
232         void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
233         {
234                 try
235                 {
236                         queues_by_priority_.at(priority).push(element);
237                 }
238                 catch (std::out_of_range&)
239                 {
240                         throw std::runtime_error("Priority not supported by queue");
241                 }
242
243                 transaction.commit();
244                 elements_available_.release();
245         }
246
247         void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
248         {
249                 BOOST_FOREACH(auto& queue, queues_by_priority_)
250                 {
251                         if (queue.second.try_pop(element))
252                         {
253                                 transaction.commit();
254                                 space_available_.release();
255
256                                 return;
257                         }
258                 }
259
260                 throw std::logic_error(
261                                 "blocking_priority_queue should have contained at least one element but didn't");
262         }
263 };
264
265 }