2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
26 #include <boost/noncopyable.hpp>
33 #include <boost/thread/condition_variable.hpp>
38 * Counting semaphore modelled after java.util.concurrent.Semaphore
40 class semaphore : boost::noncopyable
42 mutable std::mutex mutex_;
43 unsigned int permits_;
44 boost::condition_variable_any permits_available_;
45 std::map<unsigned int, std::queue<std::function<void()>>> callbacks_per_requested_permits_;
50 * @param permits The initial number of permits.
52 explicit semaphore(unsigned int permits)
62 std::unique_lock<std::mutex> lock(mutex_);
66 perform_callback_based_acquire();
67 permits_available_.notify_one();
73 * @param permits The number of permits to release.
75 void release(unsigned int permits)
77 std::unique_lock<std::mutex> lock(mutex_);
81 perform_callback_based_acquire();
82 permits_available_.notify_all();
86 * Acquire a permit. Will block until one becomes available if no permit is
87 * currently available.
91 std::unique_lock<std::mutex> lock(mutex_);
93 while (permits_ == 0u)
95 permits_available_.wait(lock);
102 * Acquire a number of permits. Will block until the given number of
103 * permits has been acquired if not enough permits are currently available.
105 * @param permits The number of permits to acquire.
107 void acquire(unsigned int permits)
109 std::unique_lock<std::mutex> lock(mutex_);
110 auto num_acquired = 0u;
114 auto num_wanted = permits - num_acquired;
115 auto to_drain = std::min(num_wanted, permits_);
117 permits_ -= to_drain;
118 num_acquired += to_drain;
120 if (num_acquired == permits)
123 permits_available_.wait(lock);
128 * Acquire a number of permits. Will not block, but instead invoke a callback
129 * when the specified number of permits are available and has been acquired.
131 * @param permits The number of permits to acquire.
132 * @param acquired_callback The callback to invoke when acquired.
134 void acquire(unsigned int permits, std::function<void()> acquired_callback)
136 std::unique_lock<std::mutex> lock(mutex_);
138 if (permits_ >= permits)
145 callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
149 * Acquire a number of permits. Will block until the given number of
150 * permits has been acquired if not enough permits are currently available
151 * or the timeout has passed.
153 * @param permits The number of permits to acquire.
154 * @param timeout The timeout (will be used for each permit).
156 * @return whether successfully acquired within timeout or not.
158 template <typename Rep, typename Period>
159 bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
161 std::unique_lock<std::mutex> lock(mutex_);
162 auto num_acquired = 0u;
166 auto num_wanted = permits - num_acquired;
167 auto to_drain = std::min(num_wanted, permits_);
169 permits_ -= to_drain;
170 num_acquired += to_drain;
172 if (num_acquired == permits)
175 if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
178 release(num_acquired);
187 * Acquire one permits if permits are currently available. Does not block
188 * until one is available, but returns immediately if unavailable.
190 * @return true if a permit was acquired or false if no permits where
191 * currently available.
195 std::unique_lock<std::mutex> lock(mutex_);
208 * @return the current number of permits (may have changed at the time of
211 unsigned int permits() const
213 std::unique_lock<std::mutex> lock(mutex_);
219 void perform_callback_based_acquire()
221 if (callbacks_per_requested_permits_.empty())
225 !callbacks_per_requested_permits_.empty() &&
226 callbacks_per_requested_permits_.begin()->first <= permits_)
228 auto requested_permits_and_callbacks = callbacks_per_requested_permits_.begin();
229 auto requested_permits = requested_permits_and_callbacks->first;
230 auto& callbacks = requested_permits_and_callbacks->second;
232 if (callbacks.empty())
234 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
238 auto& callback = callbacks.front();
240 permits_ -= requested_permits;
254 if (callbacks.empty())
255 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
261 * Enables RAII-style acquire/release on scope exit unless committed.
263 class acquire_transaction : boost::noncopyable
265 semaphore& semaphore_;
271 * @param semaphore The semaphore to acquire one permit from.
272 * @param already_acquired Whether a permit has already been acquired or not.
274 acquire_transaction(semaphore& semaphore, bool already_acquired = false)
275 : semaphore_(semaphore)
278 if (!already_acquired)
279 semaphore_.acquire();
283 * Destructor that will release one permit if commit() has not been called.
285 ~acquire_transaction()
288 semaphore_.release();
292 * Ensure that the acquired permit is kept on destruction.