2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
4 * This file is part of CasparCG (www.casparcg.com).
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
19 * Author: Helge Norberg, helge.norberg@svt.se
26 #include <boost/noncopyable.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition_variable.hpp>
36 template <class N, class Func>
37 void repeat_n(N times_to_repeat_block, const Func& func)
39 for (N i = 0; i < times_to_repeat_block; ++i)
46 * Counting semaphore modelled after java.util.concurrent.Semaphore
48 class semaphore : boost::noncopyable
50 mutable boost::mutex mutex_;
51 unsigned int permits_;
52 boost::condition_variable_any permits_available_;
53 std::map<unsigned int, std::queue<std::function<void()>>> callbacks_per_requested_permits_;
58 * @param permits The initial number of permits.
60 explicit semaphore(unsigned int permits)
70 boost::unique_lock<boost::mutex> lock(mutex_);
74 perform_callback_based_acquire();
75 permits_available_.notify_one();
81 * @param permits The number of permits to release.
83 void release(unsigned int permits)
85 boost::unique_lock<boost::mutex> lock(mutex_);
89 perform_callback_based_acquire();
90 repeat_n(permits, [this] { permits_available_.notify_one(); });
94 * Acquire a permit. Will block until one becomes available if no permit is
95 * currently available.
99 boost::unique_lock<boost::mutex> lock(mutex_);
101 while (permits_ == 0u)
103 permits_available_.wait(lock);
110 * Acquire a number of permits. Will block until the given number of
111 * permits has been acquired if not enough permits are currently available.
113 * @param permits The number of permits to acquire.
115 void acquire(unsigned int permits)
117 boost::unique_lock<boost::mutex> lock(mutex_);
118 auto num_acquired = 0u;
122 auto num_wanted = permits - num_acquired;
123 auto to_drain = std::min(num_wanted, permits_);
125 permits_ -= to_drain;
126 num_acquired += to_drain;
128 if (num_acquired == permits)
131 permits_available_.wait(lock);
136 * Acquire a number of permits. Will not block, but instead invoke a callback
137 * when the specified number of permits are available and has been acquired.
139 * @param permits The number of permits to acquire.
140 * @param acquired_callback The callback to invoke when acquired.
142 void acquire(unsigned int permits, std::function<void()> acquired_callback)
144 boost::unique_lock<boost::mutex> lock(mutex_);
146 callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
150 * Acquire a number of permits. Will block until the given number of
151 * permits has been acquired if not enough permits are currently available
152 * or the timeout has passed.
154 * @param permits The number of permits to acquire.
155 * @param timeout The timeout (will be used for each permit).
157 * @return whether successfully acquired within timeout or not.
159 template <typename Rep, typename Period>
160 bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
162 boost::unique_lock<boost::mutex> lock(mutex_);
163 auto num_acquired = 0u;
167 auto num_wanted = permits - num_acquired;
168 auto to_drain = std::min(num_wanted, permits_);
170 permits_ -= to_drain;
171 num_acquired += to_drain;
173 if (num_acquired == permits)
176 if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
179 release(num_acquired);
188 * Acquire one permits if permits are currently available. Does not block
189 * until one is available, but returns immediately if unavailable.
191 * @return true if a permit was acquired or false if no permits where
192 * currently available.
196 boost::unique_lock<boost::mutex> lock(mutex_);
209 * @return the current number of permits (may have changed at the time of
212 unsigned int permits() const
214 boost::unique_lock<boost::mutex> lock(mutex_);
220 void perform_callback_based_acquire()
222 if (callbacks_per_requested_permits_.empty())
226 !callbacks_per_requested_permits_.empty() &&
227 callbacks_per_requested_permits_.begin()->first <= permits_)
229 auto requested_permits_and_callbacks = callbacks_per_requested_permits_.begin();
230 auto requested_permits = requested_permits_and_callbacks->first;
231 auto& callbacks = requested_permits_and_callbacks->second;
233 if (callbacks.empty())
235 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
239 auto& callback = callbacks.front();
241 permits_ -= requested_permits;
255 if (callbacks.empty())
256 callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
262 * Enables RAII-style acquire/release on scope exit unless committed.
264 class acquire_transaction : boost::noncopyable
266 semaphore& semaphore_;
272 * @param semaphore The semaphore to acquire one permit from.
273 * @param already_acquired Whether a permit has already been acquired or not.
275 acquire_transaction(semaphore& semaphore, bool already_acquired = false)
276 : semaphore_(semaphore)
279 if (!already_acquired)
280 semaphore_.acquire();
284 * Destructor that will release one permit if commit() has not been called.
286 ~acquire_transaction()
289 semaphore_.release();
293 * Ensure that the acquired permit is kept on destruction.