]> git.sesse.net Git - casparcg/blob - common/semaphore.h
Created a consumer that provides sync to a channel based on the pace of another chann...
[casparcg] / common / semaphore.h
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #pragma once
23
24 #include <cmath>
25
26 #include <boost/noncopyable.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition_variable.hpp>
29
30 #include <map>
31 #include <queue>
32 #include <functional>
33
34 namespace caspar {
35
36 template <class N, class Func>
37 void repeat_n(N times_to_repeat_block, const Func& func)
38 {
39         for (N i = 0; i < times_to_repeat_block; ++i)
40         {
41                 func();
42         }
43 }
44
45 /**
46  * Counting semaphore modelled after java.util.concurrent.Semaphore
47  */
48 class semaphore : boost::noncopyable
49 {
50         mutable boost::mutex                                                                            mutex_;
51         unsigned int                                                                                            permits_;
52         boost::condition_variable_any                                                           permits_available_;
53         std::map<unsigned int, std::queue<std::function<void()>>>       callbacks_per_requested_permits_;
54 public:
55         /**
56          * Constructor.
57          *
58          * @param permits The initial number of permits.
59          */
60         explicit semaphore(unsigned int permits)
61                 : permits_(permits)
62         {
63         }
64
65         /**
66          * Release a permit.
67          */
68         void release()
69         {
70                 boost::unique_lock<boost::mutex> lock(mutex_);
71
72                 ++permits_;
73
74                 perform_callback_based_acquire();
75                 permits_available_.notify_one();
76         }
77
78         /**
79          * Release a permit.
80          *
81          * @param permits The number of permits to release.
82          */
83         void release(unsigned int permits)
84         {
85                 boost::unique_lock<boost::mutex> lock(mutex_);
86
87                 permits_ += permits;
88
89                 perform_callback_based_acquire();
90                 repeat_n(permits, [this] { permits_available_.notify_one(); });
91         }
92
93         /**
94          * Acquire a permit. Will block until one becomes available if no permit is
95          * currently available.
96          */
97         void acquire()
98         {
99                 boost::unique_lock<boost::mutex> lock(mutex_);
100
101                 while (permits_ == 0u)
102                 {
103                         permits_available_.wait(lock);
104                 }
105
106                 --permits_;
107         }
108
109         /**
110          * Acquire a number of permits. Will block until the given number of
111          * permits has been acquired if not enough permits are currently available.
112          *
113          * @param permits The number of permits to acquire.
114          */
115         void acquire(unsigned int permits)
116         {
117                 boost::unique_lock<boost::mutex> lock(mutex_);
118                 auto num_acquired = 0u;
119
120                 while (true)
121                 {
122                         auto num_wanted = permits - num_acquired;
123                         auto to_drain   = std::min(num_wanted, permits_);
124
125                         permits_                -= to_drain;
126                         num_acquired    += to_drain;
127
128                         if (num_acquired == permits)
129                                 break;
130
131                         permits_available_.wait(lock);
132                 }
133         }
134
135         /**
136         * Acquire a number of permits. Will not block, but instead invoke a callback
137         * when the specified number of permits are available and has been acquired.
138         *
139         * @param permits           The number of permits to acquire.
140         * @param acquired_callback The callback to invoke when acquired.
141         */
142         void acquire(unsigned int permits, std::function<void()> acquired_callback)
143         {
144                 boost::unique_lock<boost::mutex> lock(mutex_);
145
146                 callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
147         }
148
149         /**
150          * Acquire a number of permits. Will block until the given number of
151          * permits has been acquired if not enough permits are currently available
152          * or the timeout has passed.
153          *
154          * @param permits The number of permits to acquire.
155          * @param timeout The timeout (will be used for each permit).
156          *
157          * @return whether successfully acquired within timeout or not.
158          */
159         template <typename Rep, typename Period>
160         bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
161         {
162                 boost::unique_lock<boost::mutex> lock(mutex_);
163                 auto num_acquired = 0u;
164
165                 while (true)
166                 {
167                         auto num_wanted = permits - num_acquired;
168                         auto to_drain   = std::min(num_wanted, permits_);
169
170                         permits_                -= to_drain;
171                         num_acquired    += to_drain;
172
173                         if (num_acquired == permits)
174                                 break;
175
176                         if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
177                         {
178                                 lock.unlock();
179                                 release(num_acquired);
180                                 return false;
181                         }
182                 }
183
184                 return true;
185         }
186
187         /**
188          * Acquire one permits if permits are currently available. Does not block
189          * until one is available, but returns immediately if unavailable.
190          *
191          * @return true if a permit was acquired or false if no permits where
192          *         currently available.
193          */
194         bool try_acquire()
195         {
196                 boost::unique_lock<boost::mutex> lock(mutex_);
197
198                 if (permits_ == 0u)
199                         return false;
200                 else
201                 {
202                         --permits_;
203
204                         return true;
205                 }
206         }
207
208         /**
209          * @return the current number of permits (may have changed at the time of
210          *         return).
211          */
212         unsigned int permits() const
213         {
214                 boost::unique_lock<boost::mutex> lock(mutex_);
215
216                 return permits_;
217         }
218
219 private:
220         void perform_callback_based_acquire()
221         {
222                 if (callbacks_per_requested_permits_.empty())
223                         return;
224
225                 while (
226                         !callbacks_per_requested_permits_.empty() &&
227                         callbacks_per_requested_permits_.begin()->first <= permits_)
228                 {
229                         auto requested_permits_and_callbacks    = callbacks_per_requested_permits_.begin();
230                         auto requested_permits                                  = requested_permits_and_callbacks->first;
231                         auto& callbacks                                                 = requested_permits_and_callbacks->second;
232
233                         if (callbacks.empty())
234                         {
235                                 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
236                                 continue;
237                         }
238
239                         auto& callback                                                  = callbacks.front();
240
241                         permits_ -= requested_permits;
242                         mutex_.unlock();
243
244                         try
245                         {
246                                 callback();
247                         }
248                         catch (...)
249                         {
250                         }
251
252                         mutex_.lock();
253                         callbacks.pop();
254
255                         if (callbacks.empty())
256                                 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
257                 }
258         }
259 };
260
261 /**
262  * Enables RAII-style acquire/release on scope exit unless committed.
263  */
264 class acquire_transaction : boost::noncopyable
265 {
266         semaphore& semaphore_;
267         bool committed_;
268 public:
269         /**
270          * Constructor.
271          *
272          * @param semaphore        The semaphore to acquire one permit from.
273          * @param already_acquired Whether a permit has already been acquired or not.
274          */
275         acquire_transaction(semaphore& semaphore, bool already_acquired = false)
276                 : semaphore_(semaphore)
277                 , committed_(false)
278         {
279                 if (!already_acquired)
280                         semaphore_.acquire();
281         }
282
283         /**
284          * Destructor that will release one permit if commit() has not been called.
285          */
286         ~acquire_transaction()
287         {
288                 if (!committed_)
289                         semaphore_.release();
290         }
291
292         /**
293          * Ensure that the acquired permit is kept on destruction.
294          */
295         void commit()
296         {
297                 committed_ = true;
298         }
299 };
300
301 }