]> git.sesse.net Git - casparcg/blob - common/semaphore.h
[CHANGELOG] Updated
[casparcg] / common / semaphore.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #pragma once
23
24 #include <cmath>
25
26 #include <boost/noncopyable.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition_variable.hpp>
29
30 #include <map>
31 #include <queue>
32 #include <functional>
33
34 namespace caspar {
35
36 /**
37  * Counting semaphore modelled after java.util.concurrent.Semaphore
38  */
39 class semaphore : boost::noncopyable
40 {
41         mutable boost::mutex                                                                            mutex_;
42         unsigned int                                                                                            permits_;
43         boost::condition_variable_any                                                           permits_available_;
44         std::map<unsigned int, std::queue<std::function<void()>>>       callbacks_per_requested_permits_;
45 public:
46         /**
47          * Constructor.
48          *
49          * @param permits The initial number of permits.
50          */
51         explicit semaphore(unsigned int permits)
52                 : permits_(permits)
53         {
54         }
55
56         /**
57          * Release a permit.
58          */
59         void release()
60         {
61                 boost::unique_lock<boost::mutex> lock(mutex_);
62
63                 ++permits_;
64
65                 perform_callback_based_acquire();
66                 permits_available_.notify_one();
67         }
68
69         /**
70          * Release a permit.
71          *
72          * @param permits The number of permits to release.
73          */
74         void release(unsigned int permits)
75         {
76                 boost::unique_lock<boost::mutex> lock(mutex_);
77
78                 permits_ += permits;
79
80                 perform_callback_based_acquire();
81                 permits_available_.notify_all();
82         }
83
84         /**
85          * Acquire a permit. Will block until one becomes available if no permit is
86          * currently available.
87          */
88         void acquire()
89         {
90                 boost::unique_lock<boost::mutex> lock(mutex_);
91
92                 while (permits_ == 0u)
93                 {
94                         permits_available_.wait(lock);
95                 }
96
97                 --permits_;
98         }
99
100         /**
101          * Acquire a number of permits. Will block until the given number of
102          * permits has been acquired if not enough permits are currently available.
103          *
104          * @param permits The number of permits to acquire.
105          */
106         void acquire(unsigned int permits)
107         {
108                 boost::unique_lock<boost::mutex> lock(mutex_);
109                 auto num_acquired = 0u;
110
111                 while (true)
112                 {
113                         auto num_wanted = permits - num_acquired;
114                         auto to_drain   = std::min(num_wanted, permits_);
115
116                         permits_                -= to_drain;
117                         num_acquired    += to_drain;
118
119                         if (num_acquired == permits)
120                                 break;
121
122                         permits_available_.wait(lock);
123                 }
124         }
125
126         /**
127         * Acquire a number of permits. Will not block, but instead invoke a callback
128         * when the specified number of permits are available and has been acquired.
129         *
130         * @param permits           The number of permits to acquire.
131         * @param acquired_callback The callback to invoke when acquired.
132         */
133         void acquire(unsigned int permits, std::function<void()> acquired_callback)
134         {
135                 boost::unique_lock<boost::mutex> lock(mutex_);
136
137                 if (permits_ >= permits)
138                 {
139                         permits_ -= permits;
140                         lock.unlock();
141                         acquired_callback();
142                 }
143                 else
144                         callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
145         }
146
147         /**
148          * Acquire a number of permits. Will block until the given number of
149          * permits has been acquired if not enough permits are currently available
150          * or the timeout has passed.
151          *
152          * @param permits The number of permits to acquire.
153          * @param timeout The timeout (will be used for each permit).
154          *
155          * @return whether successfully acquired within timeout or not.
156          */
157         template <typename Rep, typename Period>
158         bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
159         {
160                 boost::unique_lock<boost::mutex> lock(mutex_);
161                 auto num_acquired = 0u;
162
163                 while (true)
164                 {
165                         auto num_wanted = permits - num_acquired;
166                         auto to_drain   = std::min(num_wanted, permits_);
167
168                         permits_                -= to_drain;
169                         num_acquired    += to_drain;
170
171                         if (num_acquired == permits)
172                                 break;
173
174                         if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
175                         {
176                                 lock.unlock();
177                                 release(num_acquired);
178                                 return false;
179                         }
180                 }
181
182                 return true;
183         }
184
185         /**
186          * Acquire one permits if permits are currently available. Does not block
187          * until one is available, but returns immediately if unavailable.
188          *
189          * @return true if a permit was acquired or false if no permits where
190          *         currently available.
191          */
192         bool try_acquire()
193         {
194                 boost::unique_lock<boost::mutex> lock(mutex_);
195
196                 if (permits_ == 0u)
197                         return false;
198                 else
199                 {
200                         --permits_;
201
202                         return true;
203                 }
204         }
205
206         /**
207          * @return the current number of permits (may have changed at the time of
208          *         return).
209          */
210         unsigned int permits() const
211         {
212                 boost::unique_lock<boost::mutex> lock(mutex_);
213
214                 return permits_;
215         }
216
217 private:
218         void perform_callback_based_acquire()
219         {
220                 if (callbacks_per_requested_permits_.empty())
221                         return;
222
223                 while (
224                         !callbacks_per_requested_permits_.empty() &&
225                         callbacks_per_requested_permits_.begin()->first <= permits_)
226                 {
227                         auto requested_permits_and_callbacks    = callbacks_per_requested_permits_.begin();
228                         auto requested_permits                                  = requested_permits_and_callbacks->first;
229                         auto& callbacks                                                 = requested_permits_and_callbacks->second;
230
231                         if (callbacks.empty())
232                         {
233                                 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
234                                 continue;
235                         }
236
237                         auto& callback                                                  = callbacks.front();
238
239                         permits_ -= requested_permits;
240                         mutex_.unlock();
241
242                         try
243                         {
244                                 callback();
245                         }
246                         catch (...)
247                         {
248                         }
249
250                         mutex_.lock();
251                         callbacks.pop();
252
253                         if (callbacks.empty())
254                                 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
255                 }
256         }
257 };
258
259 /**
260  * Enables RAII-style acquire/release on scope exit unless committed.
261  */
262 class acquire_transaction : boost::noncopyable
263 {
264         semaphore& semaphore_;
265         bool committed_;
266 public:
267         /**
268          * Constructor.
269          *
270          * @param semaphore        The semaphore to acquire one permit from.
271          * @param already_acquired Whether a permit has already been acquired or not.
272          */
273         acquire_transaction(semaphore& semaphore, bool already_acquired = false)
274                 : semaphore_(semaphore)
275                 , committed_(false)
276         {
277                 if (!already_acquired)
278                         semaphore_.acquire();
279         }
280
281         /**
282          * Destructor that will release one permit if commit() has not been called.
283          */
284         ~acquire_transaction()
285         {
286                 if (!committed_)
287                         semaphore_.release();
288         }
289
290         /**
291          * Ensure that the acquired permit is kept on destruction.
292          */
293         void commit()
294         {
295                 committed_ = true;
296         }
297 };
298
299 }