]> git.sesse.net Git - casparcg/blob - common/blocking_priority_queue.h
[scene] #564 Made a crawler example scene.
[casparcg] / common / blocking_priority_queue.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21 #pragma once
22
23 #include <stdexcept>
24 #include <map>
25 #include <initializer_list>
26
27 #include <tbb/concurrent_queue.h>
28
29 #include <boost/thread/mutex.hpp>
30
31 #include "semaphore.h"
32
33 namespace caspar {
34
35 /**
36  * Blocking concurrent priority queue supporting a predefined set of
37  * priorities. FIFO ordering is guaranteed within a priority.
38  *
39  * Prio must have the < and > operators defined where a larger instance is of a
40  * higher priority.
41  */
42 template <class T, class Prio>
43 class blocking_priority_queue
44 {
45 public:
46         typedef unsigned int size_type;
47 private:        
48         std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>>    queues_by_priority_;
49         size_type                                                                                                               capacity_;
50         semaphore                                                                                                               space_available_        { capacity_ };
51         semaphore                                                                                                               elements_available_     { 0u };
52         mutable boost::mutex                                                                                    capacity_mutex_;
53 public:
54         /**
55          * Constructor.
56          *
57          * @param capacity   The initial capacity of the queue.
58          * @param priorities A forward iterable range with the priorities to 
59          *                   support.
60          */
61         template<class PrioList>
62         blocking_priority_queue(size_type capacity, const PrioList& priorities)
63                 : capacity_(capacity)
64         {
65                 for (Prio priority : priorities)
66                 {
67                         queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
68                 }
69
70                 // The std::map is read-only from now on, so there *should* (it is 
71                 // unlikely but possible for a std::map implementor to choose to 
72                 // rebalance the tree during read operations) be no race conditions
73                 // regarding the map.
74                 //
75                 // This may be true for vc10 as well:
76                 // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
77         }
78
79         /**
80          * Push an element with a given priority to the queue. Blocks until room
81          * is available.
82          *
83          * @param priority The priority of the element.
84          * @param element  The element.
85          */
86         void push(Prio priority, const T& element)
87         {
88                 acquire_transaction transaction(space_available_);
89
90                 push_acquired(priority, element, transaction);
91         }
92
93         /**
94          * Attempt to push an element with a given priority to the queue. Will
95          * immediately return even if there is no room in the queue.
96          *
97          * @param priority The priority of the element.
98          * @param element  The element.
99          *
100          * @return true if the element was pushed. false if there was no room.
101          */
102         bool try_push(Prio priority, const T& element)
103         {
104                 if (!space_available_.try_acquire())
105                         return false;
106
107                 acquire_transaction transaction(space_available_, true);
108
109                 push_acquired(priority, element, transaction);
110
111                 return true;
112         }
113
114         /**
115          * Pop the element with the highest priority (fifo for elements with the
116          * same priority). Blocks until an element is available.
117          *
118          * @param element The element to store the result in.
119          */
120         void pop(T& element)
121         {
122                 acquire_transaction transaction(elements_available_);
123
124                 pop_acquired_any_priority(element, transaction);
125         }
126
127         /**
128          * Attempt to pop the element with the highest priority (fifo for elements
129          * with the same priority) if available. Does not wait until an element is
130          * available.
131          *
132          * @param element The element to store the result in.
133          *
134          * @return true if an element was available.
135          */
136         bool try_pop(T& element)
137         {
138                 if (!elements_available_.try_acquire())
139                         return false;
140
141                 acquire_transaction transaction(elements_available_, true);
142
143                 pop_acquired_any_priority(element, transaction);
144
145                 return true;
146         }
147
148         /**
149          * Attempt to pop the element with the highest priority (fifo for elements
150          * with the same priority) if available *and* has a minimum priority. Does
151          * not wait until an element satisfying the priority criteria is available.
152          *
153          * @param element          The element to store the result in.
154          * @param minimum_priority The minimum priority to accept.
155          *
156          * @return true if an element was available with the minimum priority.
157          */
158         bool try_pop(T& element, Prio minimum_priority)
159         {
160                 if (!elements_available_.try_acquire())
161                         return false;
162
163                 acquire_transaction transaction(elements_available_, true);
164
165                 for (auto& queue : queues_by_priority_)
166                 {
167                         if (queue.first < minimum_priority)
168                         {
169                                 // Will be true for all queues from this point so we break.
170                                 break;
171                         }
172
173                         if (queue.second.try_pop(element))
174                         {
175                                 transaction.commit();
176                                 space_available_.release();
177
178                                 return true;
179                         }
180                 }
181
182                 return false;
183         }
184
185         /**
186          * Modify the capacity of the queue. May block if reducing the capacity.
187          *
188          * @param capacity The new capacity.
189          */
190         void set_capacity(size_type capacity)
191         {
192                 boost::unique_lock<boost::mutex> lock(capacity_mutex_);
193
194                 if (capacity_ < capacity)
195                 {
196                         auto to_grow_with = capacity - capacity_;
197                         space_available_.release(to_grow_with);
198                 }
199                 else if (capacity_ > capacity)
200                 {
201                         auto to_shrink_with = capacity_ - capacity;
202                         // Will block until the desired capacity has been reached.
203                         space_available_.acquire(to_shrink_with);
204                 }
205
206                 capacity_ = capacity;
207         }
208
209         /**
210          * @return the current capacity of the queue.
211          */
212         size_type capacity() const
213         {
214                 boost::unique_lock<boost::mutex> lock (capacity_mutex_);
215
216                 return capacity_;
217         }
218
219         /**
220          * @return the current size of the queue (may have changed at the time of
221          *         returning).
222          */
223         size_type size() const
224         {
225                 return elements_available_.permits();
226         }
227
228         /**
229          * @return the current available space in the queue (may have changed at
230          *         the time of returning).
231          */
232         size_type space_available() const
233         {
234                 return space_available_.permits();
235         }
236 private:
237         void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
238         {
239                 try
240                 {
241                         queues_by_priority_.at(priority).push(element);
242                 }
243                 catch (std::out_of_range&)
244                 {
245                         throw std::runtime_error("Priority not supported by queue");
246                 }
247
248                 transaction.commit();
249                 elements_available_.release();
250         }
251
252         void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
253         {
254                 for (auto& queue : queues_by_priority_)
255                 {
256                         if (queue.second.try_pop(element))
257                         {
258                                 transaction.commit();
259                                 space_available_.release();
260
261                                 return;
262                         }
263                 }
264
265                 throw std::logic_error(
266                                 "blocking_priority_queue should have contained at least one element but didn't");
267         }
268 };
269
270 }