]> git.sesse.net Git - casparcg/blobdiff - common/semaphore.h
Fixed compilation problem on Linux.
[casparcg] / common / semaphore.h
index 1d94f015c836c74cb0439596bcdd17de3fd19293..ad6de09f829577eccf3d16bb94484839e9988bc7 100644 (file)
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
 
-namespace caspar {
+#include <map>
+#include <queue>
+#include <functional>
 
-template <class N, class Func>
-void repeat_n(N times_to_repeat_block, const Func& func)
-{
-       for (N i = 0; i < times_to_repeat_block; ++i)
-       {
-               func();
-       }
-}
+namespace caspar {
 
 /**
  * Counting semaphore modelled after java.util.concurrent.Semaphore
  */
 class semaphore : boost::noncopyable
 {
-       mutable boost::mutex mutex_;
-       unsigned int permits_;
-       boost::condition_variable_any permits_available_;
+       mutable boost::mutex                                                                            mutex_;
+       unsigned int                                                                                            permits_;
+       boost::condition_variable_any                                                           permits_available_;
+       std::map<unsigned int, std::queue<std::function<void()>>>       callbacks_per_requested_permits_;
 public:
        /**
         * Constructor.
         *
         * @param permits The initial number of permits.
         */
-       semaphore(unsigned int permits)
+       explicit semaphore(unsigned int permits)
                : permits_(permits)
        {
        }
@@ -66,6 +62,7 @@ public:
 
                ++permits_;
 
+               perform_callback_based_acquire();
                permits_available_.notify_one();
        }
 
@@ -80,7 +77,8 @@ public:
 
                permits_ += permits;
 
-               repeat_n(permits, [this] { permits_available_.notify_one(); });
+               perform_callback_based_acquire();
+               permits_available_.notify_all();
        }
 
        /**
@@ -112,11 +110,11 @@ public:
 
                while (true)
                {
-                       auto num_wanted = permits - num_acquired;
-                       auto to_drain = std::min(num_wanted, permits_);
+                       auto num_wanted = permits - num_acquired;
+                       auto to_drain   = std::min(num_wanted, permits_);
 
-                       permits_ -= to_drain;
-                       num_acquired += to_drain;
+                       permits_                -= to_drain;
+                       num_acquired    += to_drain;
 
                        if (num_acquired == permits)
                                break;
@@ -125,6 +123,65 @@ public:
                }
        }
 
+       /**
+       * Acquire a number of permits. Will not block, but instead invoke a callback
+       * when the specified number of permits are available and has been acquired.
+       *
+       * @param permits           The number of permits to acquire.
+       * @param acquired_callback The callback to invoke when acquired.
+       */
+       void acquire(unsigned int permits, std::function<void()> acquired_callback)
+       {
+               boost::unique_lock<boost::mutex> lock(mutex_);
+
+               if (permits_ >= permits)
+               {
+                       permits_ -= permits;
+                       lock.unlock();
+                       acquired_callback();
+               }
+               else
+                       callbacks_per_requested_permits_[permits].push(std::move(acquired_callback));
+       }
+
+       /**
+        * Acquire a number of permits. Will block until the given number of
+        * permits has been acquired if not enough permits are currently available
+        * or the timeout has passed.
+        *
+        * @param permits The number of permits to acquire.
+        * @param timeout The timeout (will be used for each permit).
+        *
+        * @return whether successfully acquired within timeout or not.
+        */
+       template <typename Rep, typename Period>
+       bool try_acquire(unsigned int permits, const boost::chrono::duration<Rep, Period>& timeout)
+       {
+               boost::unique_lock<boost::mutex> lock(mutex_);
+               auto num_acquired = 0u;
+
+               while (true)
+               {
+                       auto num_wanted = permits - num_acquired;
+                       auto to_drain   = std::min(num_wanted, permits_);
+
+                       permits_                -= to_drain;
+                       num_acquired    += to_drain;
+
+                       if (num_acquired == permits)
+                               break;
+
+                       if (permits_available_.wait_for(lock, timeout) == boost::cv_status::timeout)
+                       {
+                               lock.unlock();
+                               release(num_acquired);
+                               return false;
+                       }
+               }
+
+               return true;
+       }
+
        /**
         * Acquire one permits if permits are currently available. Does not block
         * until one is available, but returns immediately if unavailable.
@@ -156,6 +213,47 @@ public:
 
                return permits_;
        }
+
+private:
+       void perform_callback_based_acquire()
+       {
+               if (callbacks_per_requested_permits_.empty())
+                       return;
+
+               while (
+                       !callbacks_per_requested_permits_.empty() &&
+                       callbacks_per_requested_permits_.begin()->first <= permits_)
+               {
+                       auto requested_permits_and_callbacks    = callbacks_per_requested_permits_.begin();
+                       auto requested_permits                                  = requested_permits_and_callbacks->first;
+                       auto& callbacks                                                 = requested_permits_and_callbacks->second;
+
+                       if (callbacks.empty())
+                       {
+                               callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+                               continue;
+                       }
+
+                       auto& callback                                                  = callbacks.front();
+
+                       permits_ -= requested_permits;
+                       mutex_.unlock();
+
+                       try
+                       {
+                               callback();
+                       }
+                       catch (...)
+                       {
+                       }
+
+                       mutex_.lock();
+                       callbacks.pop();
+
+                       if (callbacks.empty())
+                               callbacks_per_requested_permits_.erase(requested_permits_and_callbacks);
+               }
+       }
 };
 
 /**