]> git.sesse.net Git - casparcg/blob - common/concurrency/governor.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / concurrency / governor.h
1 #pragma once\r
2 \r
3 #include "../memory/safe_ptr.h"\r
4 \r
5 #include <concrt.h>\r
6 #include <concurrent_queue.h>\r
7 #include <tbb/atomic.h>\r
8 \r
9 #include <boost/noncopyable.hpp>\r
10 \r
11 #include <vector>\r
12 \r
13 namespace caspar {\r
14                 \r
15 #undef Yield\r
16 \r
17 typedef std::vector<safe_ptr<int>> ticket_t;\r
18 \r
19 class governor : boost::noncopyable\r
20 {\r
21         struct impl : std::enable_shared_from_this<impl>\r
22         {\r
23                 tbb::atomic<int> count_;\r
24                 tbb::atomic<int> is_running_;\r
25                 Concurrency::concurrent_queue<Concurrency::Context*> waiting_contexts_;\r
26 \r
27                 void acquire_ticket()\r
28                 {\r
29                         if(!is_running_)\r
30                                 return;\r
31 \r
32                         if(count_ < 1)\r
33                                 Concurrency::Context::Yield();\r
34 \r
35                         if (--count_ < 0)\r
36                         {\r
37                                 auto context = Concurrency::Context::CurrentContext();\r
38                                 waiting_contexts_.push(context);\r
39                                 context->Block();\r
40                         }\r
41                 }\r
42 \r
43                 void release_ticket()\r
44                 {\r
45                         if(!is_running_)\r
46                                 return;\r
47 \r
48                         if(++count_ <= 0)\r
49                         {\r
50                                 Concurrency:: Context* waiting = NULL;\r
51                                 while(!waiting_contexts_.try_pop(waiting))\r
52                                         Concurrency::Context::Yield();\r
53                                 waiting->Unblock();\r
54                         }\r
55                 }\r
56 \r
57         public:\r
58 \r
59                 impl(size_t capacity) \r
60                 {\r
61                         is_running_ = true;\r
62                         count_ = capacity;\r
63                 }\r
64         \r
65                 ticket_t acquire()\r
66                 {\r
67                         acquire_ticket();\r
68                 \r
69                         auto self = shared_from_this();\r
70                         ticket_t ticket;\r
71                         ticket.push_back(safe_ptr<int>(new int, [this, self](int* p)\r
72                         {\r
73                                 delete p;\r
74                                 release_ticket();\r
75                         }));\r
76                         return ticket;\r
77                 }\r
78 \r
79                 void cancel()\r
80                 {\r
81                         is_running_ = false;\r
82                         Concurrency::Context* waiting = NULL;\r
83                         while(waiting_contexts_.try_pop(waiting))\r
84                                 waiting->Unblock();\r
85                 }\r
86         };\r
87 \r
88 public:\r
89         governor(size_t capacity) : impl_(new impl(capacity))\r
90         {\r
91         }\r
92         \r
93         ticket_t acquire()\r
94         {\r
95                 return impl_->acquire();\r
96         }\r
97 \r
98         void cancel()\r
99         {\r
100                 impl_->cancel();\r
101         }\r
102 \r
103 private:\r
104         safe_ptr<impl> impl_;\r
105 \r
106 };\r
107 \r
108 }