]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../utility/assert.h"\r
5 #include "../log/log.h"\r
6 \r
7 #include <tbb/atomic.h>\r
8 #include <tbb/concurrent_queue.h>\r
9 \r
10 #include <boost/thread.hpp>\r
11 #include <boost/noncopyable.hpp>\r
12 \r
13 #include <functional>\r
14 \r
15 namespace caspar {\r
16 \r
17 namespace detail {\r
18 \r
19 typedef struct tagTHREADNAME_INFO\r
20 {\r
21         DWORD dwType; // must be 0x1000\r
22         LPCSTR szName; // pointer to name (in user addr space)\r
23         DWORD dwThreadID; // thread ID (-1=caller thread)\r
24         DWORD dwFlags; // reserved for future use, must be zero\r
25 } THREADNAME_INFO;\r
26 \r
27 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)\r
28 {\r
29         THREADNAME_INFO info;\r
30         {\r
31                 info.dwType = 0x1000;\r
32                 info.szName = szThreadName;\r
33                 info.dwThreadID = dwThreadID;\r
34                 info.dwFlags = 0;\r
35         }\r
36         __try\r
37         {\r
38                 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );\r
39         }\r
40         __except (EXCEPTION_CONTINUE_EXECUTION){}       \r
41 }\r
42 \r
43 }\r
44 \r
45 class executor : boost::noncopyable\r
46 {\r
47         const std::string name_;\r
48         boost::thread thread_;\r
49         tbb::atomic<bool> is_running_;\r
50         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
51 public:\r
52                 \r
53         explicit executor(const std::wstring& name) : name_(narrow(name))\r
54         {\r
55                 is_running_ = false;\r
56         }\r
57         \r
58         virtual ~executor()\r
59         {\r
60                 stop();\r
61                 clear();\r
62                 if(boost::this_thread::get_id() != thread_.get_id())\r
63                         thread_.join();\r
64         }\r
65 \r
66         void set_capacity(size_t capacity)\r
67         {\r
68                 execution_queue_.set_capacity(capacity);\r
69         }\r
70 \r
71         void start() // noexcept\r
72         {\r
73                 if(is_running_.fetch_and_store(true))\r
74                         return;\r
75                 clear();\r
76                 thread_ = boost::thread([this]{run();});\r
77         }\r
78                         \r
79         void stop() // noexcept\r
80         {\r
81                 is_running_ = false;    \r
82                 execution_queue_.try_push([]{});\r
83         }\r
84         \r
85         void clear()\r
86         {\r
87                 std::function<void()> func;\r
88                 auto size = execution_queue_.size();\r
89                 for(int n = 0; n < size; ++n)\r
90                 {\r
91                         try\r
92                         {\r
93                                 if(!execution_queue_.try_pop(func))\r
94                                         return;\r
95                         }\r
96                         catch(boost::broken_promise&){}\r
97                 }\r
98         }\r
99                         \r
100         template<typename Func>\r
101         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
102         {       \r
103                 typedef boost::packaged_task<decltype(func())> task_type;\r
104                                 \r
105                 auto task = task_type(std::forward<Func>(func));\r
106                 auto future = task.get_future();\r
107                 \r
108                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
109                 {\r
110                         try\r
111                         {\r
112                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
113                                         my_task();\r
114                         }\r
115                         catch(boost::task_already_started&){}\r
116                 }));\r
117                                 \r
118                 struct task_adaptor_t\r
119                 {\r
120                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
121                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
122                         void operator()() const { task(); }\r
123                         mutable task_type task;\r
124                 } task_adaptor(std::move(task));\r
125 \r
126                 execution_queue_.push([=]\r
127                 {\r
128                         try{task_adaptor();}\r
129                         catch(boost::task_already_started&){}\r
130                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
131                 });\r
132                                         \r
133                 return std::move(future);               \r
134         }\r
135         \r
136         template<typename Func>\r
137         auto invoke(Func&& func) -> decltype(func())\r
138         {\r
139                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
140                         return func();\r
141                 \r
142                 return begin_invoke(std::forward<Func>(func)).get();\r
143         }\r
144         \r
145         tbb::concurrent_bounded_queue<std::function<void()>>::size_type capacity() const { return execution_queue_.capacity();  }\r
146         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
147         bool empty() const              { return execution_queue_.empty();      }\r
148         bool is_running() const { return is_running_;                           }       \r
149                 \r
150 private:\r
151         \r
152         void execute() // noexcept\r
153         {\r
154                 std::function<void()> func;\r
155                 execution_queue_.pop(func);     \r
156                 func();\r
157         }\r
158 \r
159         void run() // noexcept\r
160         {\r
161                 win32_exception::install_handler();             \r
162                 detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
163                 while(is_running_)\r
164                         execute();\r
165         }       \r
166 };\r
167 \r
168 }