]> git.sesse.net Git - casparcg/blob - common/blocking_priority_queue.h
Merged CLK changes from trunk, and separated delimiter message splitting and codepage...
[casparcg] / common / blocking_priority_queue.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21 #pragma once
22
23 #include <stdexcept>
24 #include <map>
25 #include <initializer_list>
26
27 #include <tbb/concurrent_queue.h>
28
29 #include <boost/foreach.hpp>
30 #include <boost/thread/mutex.hpp>
31
32 #include "semaphore.h"
33
34 namespace caspar {
35
36 /**
37  * Blocking concurrent priority queue supporting a predefined set of
38  * priorities. FIFO ordering is guaranteed within a priority.
39  *
40  * Prio must have the < and > operators defined where a larger instance is of a
41  * higher priority.
42  */
43 template <class T, class Prio>
44 class blocking_priority_queue
45 {
46 public:
47         typedef unsigned int size_type;
48 private:        
49         std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
50         semaphore space_available_;
51         semaphore elements_available_;
52         mutable boost::mutex capacity_mutex_;
53         size_type capacity_;
54 public:
55         /**
56          * Constructor.
57          *
58          * @param capacity   The initial capacity of the queue.
59          * @param priorities A forward iterable container with the priorities to 
60          *                   support.
61          */
62         template<class PrioList>
63         blocking_priority_queue(size_type capacity, const PrioList& priorities)
64                 : space_available_(capacity)
65                 , elements_available_(0u)
66                 , capacity_(capacity)
67         {
68                 BOOST_FOREACH(Prio priority, priorities)
69                 {
70                         queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
71                 }
72
73                 // The std::map is read-only from now on, so there *should* (it is 
74                 // unlikely but possible for a std::map implementor to choose to 
75                 // rebalance the tree during read operations) be no race conditions
76                 // regarding the map.
77                 //
78                 // This may be true for vc10 as well:
79                 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
80         }
81
82         /**
83          * Push an element with a given priority to the queue. Blocks until room
84          * is available.
85          *
86          * @param priority The priority of the element.
87          * @param element  The element.
88          */
89         void push(Prio priority, const T& element)
90         {
91                 acquire_transaction transaction(space_available_);
92
93                 push_acquired(priority, element, transaction);
94         }
95
96         /**
97          * Attempt to push an element with a given priority to the queue. Will
98          * immediately return even if there is no room in the queue.
99          *
100          * @param priority The priority of the element.
101          * @param element  The element.
102          *
103          * @return true if the element was pushed. false if there was no room.
104          */
105         bool try_push(Prio priority, const T& element)
106         {
107                 if (!space_available_.try_acquire())
108                         return false;
109
110                 acquire_transaction transaction(space_available_, true);
111
112                 push_acquired(priority, element, transaction);
113
114                 return true;
115         }
116
117         /**
118          * Pop the element with the highest priority (fifo for elements with the
119          * same priority). Blocks until an element is available.
120          *
121          * @param element The element to store the result in.
122          */
123         void pop(T& element)
124         {
125                 acquire_transaction transaction(elements_available_);
126
127                 pop_acquired_any_priority(element, transaction);
128         }
129
130         /**
131          * Attempt to pop the element with the highest priority (fifo for elements
132          * with the same priority) if available. Does not wait until an element is
133          * available.
134          *
135          * @param element The element to store the result in.
136          *
137          * @return true if an element was available.
138          */
139         bool try_pop(T& element)
140         {
141                 if (!elements_available_.try_acquire())
142                         return false;
143
144                 acquire_transaction transaction(elements_available_, true);
145
146                 pop_acquired_any_priority(element, transaction);
147
148                 return true;
149         }
150
151         /**
152          * Attempt to pop the element with the highest priority (fifo for elements
153          * with the same priority) if available *and* has a minimum priority. Does
154          * not wait until an element satisfying the priority criteria is available.
155          *
156          * @param element          The element to store the result in.
157          * @param minimum_priority The minimum priority to accept.
158          *
159          * @return true if an element was available with the minimum priority.
160          */
161         bool try_pop(T& element, Prio minimum_priority)
162         {
163                 if (!elements_available_.try_acquire())
164                         return false;
165
166                 acquire_transaction transaction(elements_available_, true);
167
168                 BOOST_FOREACH(auto& queue, queues_by_priority_)
169                 {
170                         if (queue.first < minimum_priority)
171                         {
172                                 // Will be true for all queues from this point so we break.
173                                 break;
174                         }
175
176                         if (queue.second.try_pop(element))
177                         {
178                                 transaction.commit();
179                                 space_available_.release();
180
181                                 return true;
182                         }
183                 }
184
185                 return false;
186         }
187
188         /**
189          * Modify the capacity of the queue. May block if reducing the capacity.
190          *
191          * @param capacity The new capacity.
192          */
193         void set_capacity(size_type capacity)
194         {
195                 boost::mutex::scoped_lock lock (capacity_mutex_);
196
197                 if (capacity_ < capacity)
198                 {
199                         auto to_grow_with = capacity - capacity_;
200                         space_available_.release(to_grow_with);
201                 }
202                 else if (capacity_ > capacity)
203                 {
204                         auto to_shrink_with = capacity_ - capacity;
205                         // Will block until the desired capacity has been reached.
206                         space_available_.acquire(to_shrink_with);
207                 }
208
209                 capacity_ = capacity;
210         }
211
212         /**
213          * @return the current capacity of the queue.
214          */
215         size_type capacity() const
216         {
217                 boost::mutex::scoped_lock lock (capacity_mutex_);
218
219                 return capacity_;
220         }
221
222         /**
223          * @return the current size of the queue (may have changed at the time of
224          *         returning).
225          */
226         size_type size() const
227         {
228                 return elements_available_.permits();
229         }
230 private:
231         void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
232         {
233                 try
234                 {
235                         queues_by_priority_.at(priority).push(element);
236                 }
237                 catch (std::out_of_range&)
238                 {
239                         throw std::runtime_error("Priority not supported by queue");
240                 }
241
242                 transaction.commit();
243                 elements_available_.release();
244         }
245
246         void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
247         {
248                 BOOST_FOREACH(auto& queue, queues_by_priority_)
249                 {
250                         if (queue.second.try_pop(element))
251                         {
252                                 transaction.commit();
253                                 space_available_.release();
254
255                                 return;
256                         }
257                 }
258
259                 throw std::logic_error(
260                                 "blocking_priority_queue should have contained at least one element but didn't");
261         }
262 };
263
264 }