]> git.sesse.net Git - casparcg/blob - common/semaphore.h
Remove most of boost::lexical_cast.
[casparcg] / common / semaphore.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #pragma once
23
24 #include <cmath>
25
26 #include <boost/noncopyable.hpp>
27
28 #include <map>
29 #include <mutex>
30 #include <queue>
31 #include <functional>
32
33 #include <boost/thread/condition_variable.hpp>
34
35 namespace caspar {
36
37 /**
38  * Counting semaphore modelled after java.util.concurrent.Semaphore
39  */
40 class semaphore : boost::noncopyable
41 {
42         mutable std::mutex                                                                              mutex_;
43         unsigned int                                                                                            permits_;
44         boost::condition_variable_any                                                           permits_available_;
45         std::map<unsigned int, std::queue<std::function<void()>>>       callbacks_per_requested_permits_;
46 public:
47         /**
48          * Constructor.
49          *
50          * @param permits The initial number of permits.
51          */
52         explicit semaphore(unsigned int permits)
53                 : permits_(permits)
54         {
55         }
56
57         /**
58          * Release a permit.
59          */
60         void release()
61         {
62                 std::unique_lock<std::mutex> lock(mutex_);
63
64                 ++permits_;
65
66                 perform_callback_based_acquire();
67                 permits_available_.notify_one();
68         }
69
70         /**
71          * Release a permit.
72          *
73          * @param permits The number of permits to release.
74          */
75         void release(unsigned int permits)
76         {
77                 std::unique_lock<std::mutex> lock(mutex_);
78
79                 permits_ += permits;
80
81                 perform_callback_based_acquire();
82                 permits_available_.notify_all();
83         }
84
85         /**
86          * Acquire a permit. Will block until one becomes available if no permit is
87          * currently available.
88          */
89         void acquire()
90         {
91                 std::unique_lock<std::mutex> lock(mutex_);
92
93                 while (permits_ == 0u)
94                 {
95                         permits_available_.wait(lock);
96                 }
97
98                 --permits_;
99         }
100
101         /**
102          * Acquire a number of permits. Will block until the given number of
103          * permits has been acquired if not enough permits are currently available.
104          *
105          * @param permits The number of permits to acquire.
106          */
107         void acquire(unsigned int permits)
108         {
109                 std::unique_lock<std::mutex> lock(mutex_);
110                 auto num_acquired = 0u;
111
112                 while (true)
113                 {
114                         auto num_wanted = permits - num_acquired;
115                         auto to_drain   = std::min(num_wanted, permits_);
116
117                         permits_                -= to_drain;
118                         num_acquired    += to_drain;
119
120                         if (num_acquired == permits)
121                                 break;
122
123                         permits_available_.wait(lock);
124                 }
125         }
126
127         /**
128         * Acquire a number of permits. Will not block, but instead invoke a callback
129         * when the specified number of permits are available and has been acquired.
130         *
131         * @param permits           The number of permits to acquire.
132         * @param acquired_callback The callback to invoke when acquired.
133         */
134         void acquire(unsigned int permits, std::function<void()> acquired_callback)
135         {
136                 std::unique_lock<std::mutex> lock(mutex_);
137
138                 if (permits_ >= permits)
139                 {
140                         permits_ -= permits;
141                         lock.unlock();
142                         acquired_callback();
143                 }
144                 else
145                         callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
146         }
147
148         /**
149          * Acquire a number of permits. Will block until the given number of
150          * permits has been acquired if not enough permits are currently available
151          * or the timeout has passed.
152          *
153          * @param permits The number of permits to acquire.
154          * @param timeout The timeout (will be used for each permit).
155          *
156          * @return whether successfully acquired within timeout or not.
157          */
158         template <typename Rep, typename Period>
159         bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
160         {
161                 std::unique_lock<std::mutex> lock(mutex_);
162                 auto num_acquired = 0u;
163
164                 while (true)
165                 {
166                         auto num_wanted = permits - num_acquired;
167                         auto to_drain   = std::min(num_wanted, permits_);
168
169                         permits_                -= to_drain;
170                         num_acquired    += to_drain;
171
172                         if (num_acquired == permits)
173                                 break;
174
175                         if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
176                         {
177                                 lock.unlock();
178                                 release(num_acquired);
179                                 return false;
180                         }
181                 }
182
183                 return true;
184         }
185
186         /**
187          * Acquire one permits if permits are currently available. Does not block
188          * until one is available, but returns immediately if unavailable.
189          *
190          * @return true if a permit was acquired or false if no permits where
191          *         currently available.
192          */
193         bool try_acquire()
194         {
195                 std::unique_lock<std::mutex> lock(mutex_);
196
197                 if (permits_ == 0u)
198                         return false;
199                 else
200                 {
201                         --permits_;
202
203                         return true;
204                 }
205         }
206
207         /**
208          * @return the current number of permits (may have changed at the time of
209          *         return).
210          */
211         unsigned int permits() const
212         {
213                 std::unique_lock<std::mutex> lock(mutex_);
214
215                 return permits_;
216         }
217
218 private:
219         void perform_callback_based_acquire()
220         {
221                 if (callbacks_per_requested_permits_.empty())
222                         return;
223
224                 while (
225                         !callbacks_per_requested_permits_.empty() &&
226                         callbacks_per_requested_permits_.begin()->first <= permits_)
227                 {
228                         auto requested_permits_and_callbacks    = callbacks_per_requested_permits_.begin();
229                         auto requested_permits                                  = requested_permits_and_callbacks->first;
230                         auto& callbacks                                                 = requested_permits_and_callbacks->second;
231
232                         if (callbacks.empty())
233                         {
234                                 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
235                                 continue;
236                         }
237
238                         auto& callback                                                  = callbacks.front();
239
240                         permits_ -= requested_permits;
241                         mutex_.unlock();
242
243                         try
244                         {
245                                 callback();
246                         }
247                         catch (...)
248                         {
249                         }
250
251                         mutex_.lock();
252                         callbacks.pop();
253
254                         if (callbacks.empty())
255                                 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
256                 }
257         }
258 };
259
260 /**
261  * Enables RAII-style acquire/release on scope exit unless committed.
262  */
263 class acquire_transaction : boost::noncopyable
264 {
265         semaphore& semaphore_;
266         bool committed_;
267 public:
268         /**
269          * Constructor.
270          *
271          * @param semaphore        The semaphore to acquire one permit from.
272          * @param already_acquired Whether a permit has already been acquired or not.
273          */
274         acquire_transaction(semaphore& semaphore, bool already_acquired = false)
275                 : semaphore_(semaphore)
276                 , committed_(false)
277         {
278                 if (!already_acquired)
279                         semaphore_.acquire();
280         }
281
282         /**
283          * Destructor that will release one permit if commit() has not been called.
284          */
285         ~acquire_transaction()
286         {
287                 if (!committed_)
288                         semaphore_.release();
289         }
290
291         /**
292          * Ensure that the acquired permit is kept on destruction.
293          */
294         void commit()
295         {
296                 committed_ = true;
297         }
298 };
299
300 }