#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
+#include <map>
+#include <queue>
+#include <functional>
+
namespace caspar {
template <class N, class Func>
*/
class semaphore : boost::noncopyable
{
- mutable boost::mutex mutex_;
- unsigned int permits_;
- boost::condition_variable_any permits_available_;
+ mutable boost::mutex mutex_;
+ unsigned int permits_;
+ boost::condition_variable_any permits_available_;
+ std::map<unsigned int, std::queue<std::function<void()>>> callbacks_per_requested_permits_;
public:
/**
* Constructor.
++permits_;
+ perform_callback_based_acquire();
permits_available_.notify_one();
}
permits_ += permits;
+ perform_callback_based_acquire();
repeat_n(permits, [this] { permits_available_.notify_one(); });
}
while (true)
{
- auto num_wanted = permits - num_acquired;
- auto to_drain = std::min(num_wanted, permits_);
+ auto num_wanted = permits - num_acquired;
+ auto to_drain = std::min(num_wanted, permits_);
- permits_ -= to_drain;
- num_acquired += to_drain;
+ permits_ -= to_drain;
+ num_acquired += to_drain;
if (num_acquired == permits)
break;
}
}
+ /**
+ * Acquire a number of permits. Will not block, but instead invoke a callback
+ * when the specified number of permits are available and has been acquired.
+ *
+ * @param permits The number of permits to acquire.
+ * @param acquired_callback The callback to invoke when acquired.
+ */
+ void acquire(unsigned int permits, std::function<void()> acquired_callback)
+ {
+ boost::unique_lock<boost::mutex> lock(mutex_);
+
+ callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
+ }
+
/**
* Acquire a number of permits. Will block until the given number of
* permits has been acquired if not enough permits are currently available
while (true)
{
- auto num_wanted = permits - num_acquired;
- auto to_drain = std::min(num_wanted, permits_);
+ auto num_wanted = permits - num_acquired;
+ auto to_drain = std::min(num_wanted, permits_);
- permits_ -= to_drain;
- num_acquired += to_drain;
+ permits_ -= to_drain;
+ num_acquired += to_drain;
if (num_acquired == permits)
break;
return permits_;
}
+
+private:
+ void perform_callback_based_acquire()
+ {
+ if (callbacks_per_requested_permits_.empty())
+ return;
+
+ while (
+ !callbacks_per_requested_permits_.empty() &&
+ callbacks_per_requested_permits_.begin()->first <= permits_)
+ {
+ auto requested_permits_and_callbacks = callbacks_per_requested_permits_.begin();
+ auto requested_permits = requested_permits_and_callbacks->first;
+ auto& callbacks = requested_permits_and_callbacks->second;
+
+ if (callbacks.empty())
+ {
+ callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+ continue;
+ }
+
+ auto& callback = callbacks.front();
+
+ permits_ -= requested_permits;
+ mutex_.unlock();
+
+ try
+ {
+ callback();
+ }
+ catch (...)
+ {
+ }
+
+ mutex_.lock();
+ callbacks.pop();
+
+ if (callbacks.empty())
+ callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+ }
+ }
};
/**