]> git.sesse.net Git - casparcg/blob - common/blocking_priority_queue.h
Remove most of boost::lexical_cast.
[casparcg] / common / blocking_priority_queue.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21 #pragma once
22
23 #include <stdexcept>
24 #include <map>
25 #include <initializer_list>
26
27 #include <tbb/concurrent_queue.h>
28
29 #include "semaphore.h"
30
31 namespace caspar {
32
33 /**
34  * Blocking concurrent priority queue supporting a predefined set of
35  * priorities. FIFO ordering is guaranteed within a priority.
36  *
37  * Prio must have the < and > operators defined where a larger instance is of a
38  * higher priority.
39  */
40 template <class T, class Prio>
41 class blocking_priority_queue
42 {
43 public:
44         typedef unsigned int size_type;
45 private:        
46         std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>>    queues_by_priority_;
47         size_type                                                                                                               capacity_;
48         semaphore                                                                                                               space_available_        { capacity_ };
49         semaphore                                                                                                               elements_available_     { 0u };
50         mutable std::mutex                                                                                      capacity_mutex_;
51 public:
52         /**
53          * Constructor.
54          *
55          * @param capacity   The initial capacity of the queue.
56          * @param priorities A forward iterable range with the priorities to 
57          *                   support.
58          */
59         template<class PrioList>
60         blocking_priority_queue(size_type capacity, const PrioList& priorities)
61                 : capacity_(capacity)
62         {
63                 for (Prio priority : priorities)
64                 {
65                         queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
66                 }
67
68                 // The std::map is read-only from now on, so there *should* (it is 
69                 // unlikely but possible for a std::map implementor to choose to 
70                 // rebalance the tree during read operations) be no race conditions
71                 // regarding the map.
72                 //
73                 // This may be true for vc10 as well:
74                 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
75         }
76
77         /**
78          * Push an element with a given priority to the queue. Blocks until room
79          * is available.
80          *
81          * @param priority The priority of the element.
82          * @param element  The element.
83          */
84         void push(Prio priority, const T& element)
85         {
86                 acquire_transaction transaction(space_available_);
87
88                 push_acquired(priority, element, transaction);
89         }
90
91         /**
92          * Attempt to push an element with a given priority to the queue. Will
93          * immediately return even if there is no room in the queue.
94          *
95          * @param priority The priority of the element.
96          * @param element  The element.
97          *
98          * @return true if the element was pushed. false if there was no room.
99          */
100         bool try_push(Prio priority, const T& element)
101         {
102                 if (!space_available_.try_acquire())
103                         return false;
104
105                 acquire_transaction transaction(space_available_, true);
106
107                 push_acquired(priority, element, transaction);
108
109                 return true;
110         }
111
112         /**
113          * Pop the element with the highest priority (fifo for elements with the
114          * same priority). Blocks until an element is available.
115          *
116          * @param element The element to store the result in.
117          */
118         void pop(T& element)
119         {
120                 acquire_transaction transaction(elements_available_);
121
122                 pop_acquired_any_priority(element, transaction);
123         }
124
125         /**
126          * Attempt to pop the element with the highest priority (fifo for elements
127          * with the same priority) if available. Does not wait until an element is
128          * available.
129          *
130          * @param element The element to store the result in.
131          *
132          * @return true if an element was available.
133          */
134         bool try_pop(T& element)
135         {
136                 if (!elements_available_.try_acquire())
137                         return false;
138
139                 acquire_transaction transaction(elements_available_, true);
140
141                 pop_acquired_any_priority(element, transaction);
142
143                 return true;
144         }
145
146         /**
147          * Attempt to pop the element with the highest priority (fifo for elements
148          * with the same priority) if available *and* has a minimum priority. Does
149          * not wait until an element satisfying the priority criteria is available.
150          *
151          * @param element          The element to store the result in.
152          * @param minimum_priority The minimum priority to accept.
153          *
154          * @return true if an element was available with the minimum priority.
155          */
156         bool try_pop(T& element, Prio minimum_priority)
157         {
158                 if (!elements_available_.try_acquire())
159                         return false;
160
161                 acquire_transaction transaction(elements_available_, true);
162
163                 for (auto& queue : queues_by_priority_)
164                 {
165                         if (queue.first < minimum_priority)
166                         {
167                                 // Will be true for all queues from this point so we break.
168                                 break;
169                         }
170
171                         if (queue.second.try_pop(element))
172                         {
173                                 transaction.commit();
174                                 space_available_.release();
175
176                                 return true;
177                         }
178                 }
179
180                 return false;
181         }
182
183         /**
184          * Modify the capacity of the queue. May block if reducing the capacity.
185          *
186          * @param capacity The new capacity.
187          */
188         void set_capacity(size_type capacity)
189         {
190                 std::unique_lock<std::mutex> lock(capacity_mutex_);
191
192                 if (capacity_ < capacity)
193                 {
194                         auto to_grow_with = capacity - capacity_;
195                         space_available_.release(to_grow_with);
196                 }
197                 else if (capacity_ > capacity)
198                 {
199                         auto to_shrink_with = capacity_ - capacity;
200                         // Will block until the desired capacity has been reached.
201                         space_available_.acquire(to_shrink_with);
202                 }
203
204                 capacity_ = capacity;
205         }
206
207         /**
208          * @return the current capacity of the queue.
209          */
210         size_type capacity() const
211         {
212                 std::unique_lock<std::mutex> lock (capacity_mutex_);
213
214                 return capacity_;
215         }
216
217         /**
218          * @return the current size of the queue (may have changed at the time of
219          *         returning).
220          */
221         size_type size() const
222         {
223                 return elements_available_.permits();
224         }
225
226         /**
227          * @return the current available space in the queue (may have changed at
228          *         the time of returning).
229          */
230         size_type space_available() const
231         {
232                 return space_available_.permits();
233         }
234 private:
235         void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
236         {
237                 try
238                 {
239                         queues_by_priority_.at(priority).push(element);
240                 }
241                 catch (std::out_of_range&)
242                 {
243                         throw std::runtime_error("Priority not supported by queue");
244                 }
245
246                 transaction.commit();
247                 elements_available_.release();
248         }
249
250         void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
251         {
252                 for (auto& queue : queues_by_priority_)
253                 {
254                         if (queue.second.try_pop(element))
255                         {
256                                 transaction.commit();
257                                 space_available_.release();
258
259                                 return;
260                         }
261                 }
262
263                 throw std::logic_error(
264                                 "blocking_priority_queue should have contained at least one element but didn't");
265         }
266 };
267
268 }