2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
26 #include <boost/noncopyable.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition_variable.hpp>
37 * Counting semaphore modelled after java.util.concurrent.Semaphore
39 class semaphore : boost::noncopyable
41 mutable boost::mutex mutex_;
42 unsigned int permits_;
43 boost::condition_variable_any permits_available_;
44 std::map<unsigned int, std::queue<std::function<void()>>> callbacks_per_requested_permits_;
49 * @param permits The initial number of permits.
51 explicit semaphore(unsigned int permits)
61 boost::unique_lock<boost::mutex> lock(mutex_);
65 perform_callback_based_acquire();
66 permits_available_.notify_one();
72 * @param permits The number of permits to release.
74 void release(unsigned int permits)
76 boost::unique_lock<boost::mutex> lock(mutex_);
80 perform_callback_based_acquire();
81 permits_available_.notify_all();
85 * Acquire a permit. Will block until one becomes available if no permit is
86 * currently available.
90 boost::unique_lock<boost::mutex> lock(mutex_);
92 while (permits_ == 0u)
94 permits_available_.wait(lock);
101 * Acquire a number of permits. Will block until the given number of
102 * permits has been acquired if not enough permits are currently available.
104 * @param permits The number of permits to acquire.
106 void acquire(unsigned int permits)
108 boost::unique_lock<boost::mutex> lock(mutex_);
109 auto num_acquired = 0u;
113 auto num_wanted = permits - num_acquired;
114 auto to_drain = std::min(num_wanted, permits_);
116 permits_ -= to_drain;
117 num_acquired += to_drain;
119 if (num_acquired == permits)
122 permits_available_.wait(lock);
127 * Acquire a number of permits. Will not block, but instead invoke a callback
128 * when the specified number of permits are available and has been acquired.
130 * @param permits The number of permits to acquire.
131 * @param acquired_callback The callback to invoke when acquired.
133 void acquire(unsigned int permits, std::function<void()> acquired_callback)
135 boost::unique_lock<boost::mutex> lock(mutex_);
137 if (permits_ >= permits)
144 callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
148 * Acquire a number of permits. Will block until the given number of
149 * permits has been acquired if not enough permits are currently available
150 * or the timeout has passed.
152 * @param permits The number of permits to acquire.
153 * @param timeout The timeout (will be used for each permit).
155 * @return whether successfully acquired within timeout or not.
157 template <typename Rep, typename Period>
158 bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
160 boost::unique_lock<boost::mutex> lock(mutex_);
161 auto num_acquired = 0u;
165 auto num_wanted = permits - num_acquired;
166 auto to_drain = std::min(num_wanted, permits_);
168 permits_ -= to_drain;
169 num_acquired += to_drain;
171 if (num_acquired == permits)
174 if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
177 release(num_acquired);
186 * Acquire one permits if permits are currently available. Does not block
187 * until one is available, but returns immediately if unavailable.
189 * @return true if a permit was acquired or false if no permits where
190 * currently available.
194 boost::unique_lock<boost::mutex> lock(mutex_);
207 * @return the current number of permits (may have changed at the time of
210 unsigned int permits() const
212 boost::unique_lock<boost::mutex> lock(mutex_);
218 void perform_callback_based_acquire()
220 if (callbacks_per_requested_permits_.empty())
224 !callbacks_per_requested_permits_.empty() &&
225 callbacks_per_requested_permits_.begin()->first <= permits_)
227 auto requested_permits_and_callbacks = callbacks_per_requested_permits_.begin();
228 auto requested_permits = requested_permits_and_callbacks->first;
229 auto& callbacks = requested_permits_and_callbacks->second;
231 if (callbacks.empty())
233 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
237 auto& callback = callbacks.front();
239 permits_ -= requested_permits;
253 if (callbacks.empty())
254 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
260 * Enables RAII-style acquire/release on scope exit unless committed.
262 class acquire_transaction : boost::noncopyable
264 semaphore& semaphore_;
270 * @param semaphore The semaphore to acquire one permit from.
271 * @param already_acquired Whether a permit has already been acquired or not.
273 acquire_transaction(semaphore& semaphore, bool already_acquired = false)
274 : semaphore_(semaphore)
277 if (!already_acquired)
278 semaphore_.acquire();
282 * Destructor that will release one permit if commit() has not been called.
284 ~acquire_transaction()
287 semaphore_.release();
291 * Ensure that the acquired permit is kept on destruction.