]> git.sesse.net Git - bcachefs-tools-debian/blob - linux/workqueue.c
Move c_src dirs back to toplevel
[bcachefs-tools-debian] / linux / workqueue.c
1 #include <pthread.h>
2
3 #include <linux/kthread.h>
4 #include <linux/slab.h>
5 #include <linux/workqueue.h>
6
7 static pthread_mutex_t  wq_lock = PTHREAD_MUTEX_INITIALIZER;
8 static pthread_cond_t   work_finished = PTHREAD_COND_INITIALIZER;
9 static LIST_HEAD(wq_list);
10
11 struct workqueue_struct {
12         struct list_head        list;
13
14         struct work_struct      *current_work;
15         struct list_head        pending_work;
16
17         struct task_struct      *worker;
18         char                    name[24];
19 };
20
21 enum {
22         WORK_PENDING_BIT,
23 };
24
25 static bool work_pending(struct work_struct *work)
26 {
27         return test_bit(WORK_PENDING_BIT, work_data_bits(work));
28 }
29
30 static void clear_work_pending(struct work_struct *work)
31 {
32         clear_bit(WORK_PENDING_BIT, work_data_bits(work));
33 }
34
35 static bool set_work_pending(struct work_struct *work)
36 {
37         return !test_and_set_bit(WORK_PENDING_BIT, work_data_bits(work));
38 }
39
40 static void __queue_work(struct workqueue_struct *wq,
41                          struct work_struct *work)
42 {
43         BUG_ON(!work_pending(work));
44         BUG_ON(!list_empty(&work->entry));
45
46         list_add_tail(&work->entry, &wq->pending_work);
47         wake_up_process(wq->worker);
48 }
49
50 bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
51 {
52         bool ret;
53
54         pthread_mutex_lock(&wq_lock);
55         if ((ret = set_work_pending(work)))
56                 __queue_work(wq, work);
57         pthread_mutex_unlock(&wq_lock);
58
59         return ret;
60 }
61
62 void delayed_work_timer_fn(struct timer_list *timer)
63 {
64         struct delayed_work *dwork =
65                 container_of(timer, struct delayed_work, timer);
66
67         pthread_mutex_lock(&wq_lock);
68         __queue_work(dwork->wq, &dwork->work);
69         pthread_mutex_unlock(&wq_lock);
70 }
71
72 static void __queue_delayed_work(struct workqueue_struct *wq,
73                                  struct delayed_work *dwork,
74                                  unsigned long delay)
75 {
76         struct timer_list *timer = &dwork->timer;
77         struct work_struct *work = &dwork->work;
78
79         BUG_ON(timer->function != delayed_work_timer_fn);
80         BUG_ON(timer_pending(timer));
81         BUG_ON(!list_empty(&work->entry));
82
83         if (!delay) {
84                 __queue_work(wq, &dwork->work);
85         } else {
86                 dwork->wq = wq;
87                 timer->expires = jiffies + delay;
88                 add_timer(timer);
89         }
90 }
91
92 bool queue_delayed_work(struct workqueue_struct *wq,
93                         struct delayed_work *dwork,
94                         unsigned long delay)
95 {
96         struct work_struct *work = &dwork->work;
97         bool ret;
98
99         pthread_mutex_lock(&wq_lock);
100         if ((ret = set_work_pending(work)))
101                 __queue_delayed_work(wq, dwork, delay);
102         pthread_mutex_unlock(&wq_lock);
103
104         return ret;
105 }
106
107 static bool grab_pending(struct work_struct *work, bool is_dwork)
108 {
109 retry:
110         if (set_work_pending(work)) {
111                 BUG_ON(!list_empty(&work->entry));
112                 return false;
113         }
114
115         if (is_dwork) {
116                 struct delayed_work *dwork = to_delayed_work(work);
117
118                 if (likely(del_timer(&dwork->timer))) {
119                         BUG_ON(!list_empty(&work->entry));
120                         return true;
121                 }
122         }
123
124         if (!list_empty(&work->entry)) {
125                 list_del_init(&work->entry);
126                 return true;
127         }
128
129         BUG_ON(!is_dwork);
130
131         pthread_mutex_unlock(&wq_lock);
132         flush_timers();
133         pthread_mutex_lock(&wq_lock);
134         goto retry;
135 }
136
137 static bool work_running(struct work_struct *work)
138 {
139         struct workqueue_struct *wq;
140
141         list_for_each_entry(wq, &wq_list, list)
142                 if (wq->current_work == work)
143                         return true;
144
145         return false;
146 }
147
148 bool flush_work(struct work_struct *work)
149 {
150         bool ret = false;
151
152         pthread_mutex_lock(&wq_lock);
153         while (work_pending(work) || work_running(work)) {
154                 pthread_cond_wait(&work_finished, &wq_lock);
155                 ret = true;
156         }
157         pthread_mutex_unlock(&wq_lock);
158
159         return ret;
160 }
161
162 static bool __flush_work(struct work_struct *work)
163 {
164         bool ret = false;
165
166         while (work_running(work)) {
167                 pthread_cond_wait(&work_finished, &wq_lock);
168                 ret = true;
169         }
170
171         return ret;
172 }
173
174 bool cancel_work_sync(struct work_struct *work)
175 {
176         bool ret;
177
178         pthread_mutex_lock(&wq_lock);
179         ret = grab_pending(work, false);
180
181         __flush_work(work);
182         clear_work_pending(work);
183         pthread_mutex_unlock(&wq_lock);
184
185         return ret;
186 }
187
188 bool mod_delayed_work(struct workqueue_struct *wq,
189                       struct delayed_work *dwork,
190                       unsigned long delay)
191 {
192         struct work_struct *work = &dwork->work;
193         bool ret;
194
195         pthread_mutex_lock(&wq_lock);
196         ret = grab_pending(work, true);
197
198         __queue_delayed_work(wq, dwork, delay);
199         pthread_mutex_unlock(&wq_lock);
200
201         return ret;
202 }
203
204 bool cancel_delayed_work(struct delayed_work *dwork)
205 {
206         struct work_struct *work = &dwork->work;
207         bool ret;
208
209         pthread_mutex_lock(&wq_lock);
210         ret = grab_pending(work, true);
211
212         clear_work_pending(&dwork->work);
213         pthread_mutex_unlock(&wq_lock);
214
215         return ret;
216 }
217
218 bool cancel_delayed_work_sync(struct delayed_work *dwork)
219 {
220         struct work_struct *work = &dwork->work;
221         bool ret;
222
223         pthread_mutex_lock(&wq_lock);
224         ret = grab_pending(work, true);
225
226         __flush_work(work);
227         clear_work_pending(work);
228         pthread_mutex_unlock(&wq_lock);
229
230         return ret;
231 }
232
233 static int worker_thread(void *arg)
234 {
235         struct workqueue_struct *wq = arg;
236         struct work_struct *work;
237
238         pthread_mutex_lock(&wq_lock);
239         while (1) {
240                 __set_current_state(TASK_INTERRUPTIBLE);
241                 work = list_first_entry_or_null(&wq->pending_work,
242                                 struct work_struct, entry);
243                 wq->current_work = work;
244
245                 if (kthread_should_stop()) {
246                         BUG_ON(wq->current_work);
247                         break;
248                 }
249
250                 if (!work) {
251                         pthread_mutex_unlock(&wq_lock);
252                         schedule();
253                         pthread_mutex_lock(&wq_lock);
254                         continue;
255                 }
256
257                 BUG_ON(!work_pending(work));
258                 list_del_init(&work->entry);
259                 clear_work_pending(work);
260
261                 pthread_mutex_unlock(&wq_lock);
262                 work->func(work);
263                 pthread_mutex_lock(&wq_lock);
264
265                 pthread_cond_broadcast(&work_finished);
266         }
267         pthread_mutex_unlock(&wq_lock);
268
269         return 0;
270 }
271
272 void destroy_workqueue(struct workqueue_struct *wq)
273 {
274         kthread_stop(wq->worker);
275
276         pthread_mutex_lock(&wq_lock);
277         list_del(&wq->list);
278         pthread_mutex_unlock(&wq_lock);
279
280         kfree(wq);
281 }
282
283 struct workqueue_struct *alloc_workqueue(const char *fmt,
284                                          unsigned flags,
285                                          int max_active,
286                                          ...)
287 {
288         va_list args;
289         struct workqueue_struct *wq;
290
291         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
292         if (!wq)
293                 return NULL;
294
295         INIT_LIST_HEAD(&wq->list);
296         INIT_LIST_HEAD(&wq->pending_work);
297
298         va_start(args, max_active);
299         vsnprintf(wq->name, sizeof(wq->name), fmt, args);
300         va_end(args);
301
302         wq->worker = kthread_run(worker_thread, wq, "%s", wq->name);
303         if (IS_ERR(wq->worker)) {
304                 kfree(wq);
305                 return NULL;
306         }
307
308         pthread_mutex_lock(&wq_lock);
309         list_add(&wq->list, &wq_list);
310         pthread_mutex_unlock(&wq_lock);
311
312         return wq;
313 }
314
315 struct workqueue_struct *system_wq;
316 struct workqueue_struct *system_highpri_wq;
317 struct workqueue_struct *system_long_wq;
318 struct workqueue_struct *system_unbound_wq;
319 struct workqueue_struct *system_freezable_wq;
320
321 __attribute__((constructor(102)))
322 static void wq_init(void)
323 {
324         system_wq = alloc_workqueue("events", 0, 0);
325         system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
326         system_long_wq = alloc_workqueue("events_long", 0, 0);
327         system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
328                                             WQ_UNBOUND_MAX_ACTIVE);
329         system_freezable_wq = alloc_workqueue("events_freezable",
330                                               WQ_FREEZABLE, 0);
331         BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
332                !system_unbound_wq || !system_freezable_wq);
333 }
334
335 __attribute__((destructor(102)))
336 static void wq_cleanup(void)
337 {
338         destroy_workqueue(system_freezable_wq);
339         destroy_workqueue(system_unbound_wq);
340         destroy_workqueue(system_long_wq);
341         destroy_workqueue(system_highpri_wq);
342         destroy_workqueue(system_wq);
343
344         system_wq = system_highpri_wq = system_long_wq = system_unbound_wq =
345                 system_freezable_wq = NULL;
346 }