#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
+#include <map>
+#include <queue>
+#include <functional>
+
namespace caspar {
template <class N, class Func>
*/
class semaphore : boost::noncopyable
{
- mutable boost::mutex mutex_;
- unsigned int permits_;
- boost::condition_variable_any permits_available_;
+ mutable boost::mutex mutex_;
+ unsigned int permits_;
+ boost::condition_variable_any permits_available_;
+ std::map<unsigned int, std::queue<std::function<void()>>> callbacks_per_requested_permits_;
public:
/**
* Constructor.
*
* @param permits The initial number of permits.
*/
- semaphore(unsigned int permits)
+ explicit semaphore(unsigned int permits)
: permits_(permits)
{
}
++permits_;
+ perform_callback_based_acquire();
permits_available_.notify_one();
}
permits_ += permits;
+ perform_callback_based_acquire();
repeat_n(permits, [this] { permits_available_.notify_one(); });
}
while (true)
{
- auto num_wanted = permits - num_acquired;
- auto to_drain = std::min(num_wanted, permits_);
+ auto num_wanted = permits - num_acquired;
+ auto to_drain = std::min(num_wanted, permits_);
- permits_ -= to_drain;
- num_acquired += to_drain;
+ permits_ -= to_drain;
+ num_acquired += to_drain;
if (num_acquired == permits)
break;
}
}
+ /**
+ * Acquire a number of permits. Will not block, but instead invoke a callback
+ * when the specified number of permits are available and has been acquired.
+ *
+ * @param permits The number of permits to acquire.
+ * @param acquired_callback The callback to invoke when acquired.
+ */
+ void acquire(unsigned int permits, std::function<void()> acquired_callback)
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+
+ callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
+ }
+
+ /**
+ * Acquire a number of permits. Will block until the given number of
+ * permits has been acquired if not enough permits are currently available
+ * or the timeout has passed.
+ *
+ * @param permits The number of permits to acquire.
+ * @param timeout The timeout (will be used for each permit).
+ *
+ * @return whether successfully acquired within timeout or not.
+ */
+ template <typename Rep, typename Period>
+ bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+ auto num_acquired = 0u;
+
+ while (true)
+ {
+ auto num_wanted = permits - num_acquired;
+ auto to_drain = std::min(num_wanted, permits_);
+
+ permits_ -= to_drain;
+ num_acquired += to_drain;
+
+ if (num_acquired == permits)
+ break;
+
+ if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
+ {
+ lock.unlock();
+ release(num_acquired);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
/**
* Acquire one permits if permits are currently available. Does not block
* until one is available, but returns immediately if unavailable.
return permits_;
}
+
+private:
+ void perform_callback_based_acquire()
+ {
+ if (callbacks_per_requested_permits_.empty())
+ return;
+
+ while (
+ !callbacks_per_requested_permits_.empty() &&
+ callbacks_per_requested_permits_.begin()->first <= permits_)
+ {
+ auto requested_permits_and_callbacks = callbacks_per_requested_permits_.begin();
+ auto requested_permits = requested_permits_and_callbacks->first;
+ auto& callbacks = requested_permits_and_callbacks->second;
+
+ if (callbacks.empty())
+ {
+ callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+ continue;
+ }
+
+ auto& callback = callbacks.front();
+
+ permits_ -= requested_permits;
+ mutex_.unlock();
+
+ try
+ {
+ callback();
+ }
+ catch (...)
+ {
+ }
+
+ mutex_.lock();
+ callbacks.pop();
+
+ if (callbacks.empty())
+ callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+ }
+ }
};
/**