]> git.sesse.net Git - casparcg/blob - common/blocking_bounded_queue_adapter.h
Remove most of boost::lexical_cast.
[casparcg] / common / blocking_bounded_queue_adapter.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #pragma once
23
24 #include "semaphore.h"
25
26 #include <boost/noncopyable.hpp>
27
28 namespace caspar {
29
30 /**
31  * Adapts an unbounded non-blocking concurrent queue into a blocking bounded
32  * concurrent queue.
33  *
34  * The queue Q to adapt must support the following use cases:
35  *
36  * Q q;
37  * Q::value_type elem;
38  * q.push(elem);
39  *
40  * and:
41  *
42  * Q q;
43  * Q::value_type elem;
44  * q.try_pop(elem);
45  *
46  * It must also guarantee thread safety for those operations.
47  */
48 template<class Q>
49 class blocking_bounded_queue_adapter : boost::noncopyable
50 {
51 public:
52         typedef typename Q::value_type value_type;
53         typedef unsigned int size_type;
54 private:
55         mutable std::mutex      capacity_mutex_;
56         size_type                               capacity_;
57         semaphore                               space_available_                = capacity_;
58         semaphore                               elements_available_             = 0;
59         Q                                               queue_;
60 public:
61         /**
62          * Constructor.
63          *
64          * @param capacity The capacity of the queue.
65          */
66         blocking_bounded_queue_adapter(size_type capacity)
67                 : capacity_(capacity)
68         {
69         }
70
71         /**
72          * Push an element to the queue, block until room is available.
73          *
74          * @param element The element to push.
75          */
76         void push(const value_type& element)
77         {
78                 space_available_.acquire();
79                 push_after_room_reserved(element);
80         }
81
82         /**
83          * Try to push an element to the queue, returning immediately if room is not
84          * available.
85          *
86          * @param element The element to push.
87          *
88          * @return true if there was room for the element.
89          */
90         bool try_push(const value_type& element)
91         {
92                 bool room_available = space_available_.try_acquire();
93
94                 if (!room_available)
95                         return false;
96
97                 push_after_room_reserved(element);
98
99                 return true;
100         }
101
102         /**
103          * Pop an element from the queue, will block until an element is available.
104          *
105          * @param element The element to store the result in.
106          */
107         void pop(value_type& element)
108         {
109                 elements_available_.acquire();
110                 queue_.try_pop(element);
111                 space_available_.release();
112         }
113
114         /**
115          * Try to pop an element from the queue, returning immediately if no
116          * element is available.
117          *
118          * @param element The element to store the result in.
119          *
120          * @return true if an element was popped.
121          */
122         bool try_pop(value_type& element)
123         {
124                 if (!elements_available_.try_acquire())
125                         return false;
126
127                 queue_.try_pop(element);
128                 space_available_.release();
129
130                 return true;
131         }
132
133         /**
134          * Modify the capacity of the queue. May block if reducing the capacity.
135          *
136          * @param capacity The new capacity.
137          */
138         void set_capacity(size_type capacity)
139         {
140                 std::unique_lock<std::mutex> lock (capacity_mutex_);
141
142                 if (capacity_ < capacity)
143                 {
144                         auto to_grow_with = capacity - capacity_;
145
146                         space_available_.release(to_grow_with);
147                 }
148                 else if (capacity_ > capacity)
149                 {
150                         auto to_shrink_with = capacity_ - capacity;
151
152                         // Will block until the desired capacity has been reached.
153                         space_available_.acquire(to_shrink_with);
154                 }
155
156                 capacity_ = capacity;
157         }
158
159         /**
160          * @return the current capacity of the queue.
161          */
162         size_type capacity() const
163         {
164                 std::unique_lock<std::mutex> lock (capacity_mutex_);
165
166                 return capacity_;
167         }
168
169         /**
170          * @return the current size of the queue (may have changed at the time of
171          *         returning).
172          */
173         size_type size() const
174         {
175                 return elements_available_.permits();
176         }
177 private:
178         void push_after_room_reserved(const value_type& element)
179         {
180                 try
181                 {
182                         queue_.push(element);
183                 }
184                 catch (...)
185                 {
186                         space_available_.release();
187
188                         throw;
189                 }
190
191                 elements_available_.release();
192         }
193 };
194
195 }