]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/rebalance.c
Update bcachefs sources to 3cd63315a6 bcachefs: Track incompressible data
[bcachefs-tools-debian] / libbcachefs / rebalance.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 #include "bcachefs.h"
4 #include "alloc_foreground.h"
5 #include "btree_iter.h"
6 #include "buckets.h"
7 #include "clock.h"
8 #include "disk_groups.h"
9 #include "extents.h"
10 #include "io.h"
11 #include "move.h"
12 #include "rebalance.h"
13 #include "super-io.h"
14
15 #include <linux/freezer.h>
16 #include <linux/kthread.h>
17 #include <linux/sched/cputime.h>
18 #include <trace/events/bcachefs.h>
19
20 /*
21  * Check if an extent should be moved:
22  * returns -1 if it should not be moved, or
23  * device of pointer that should be moved, if known, or INT_MAX if unknown
24  */
25 static int __bch2_rebalance_pred(struct bch_fs *c,
26                                  struct bkey_s_c k,
27                                  struct bch_io_opts *io_opts)
28 {
29         struct bkey_ptrs_c ptrs = bch2_bkey_ptrs_c(k);
30         const union bch_extent_entry *entry;
31         struct extent_ptr_decoded p;
32
33         if (io_opts->background_compression &&
34             !bch2_bkey_is_incompressible(k))
35                 bkey_for_each_ptr_decode(k.k, ptrs, p, entry)
36                         if (!p.ptr.cached &&
37                             p.crc.compression_type !=
38                             bch2_compression_opt_to_type[io_opts->background_compression])
39                                 return p.ptr.dev;
40
41         if (io_opts->background_target)
42                 bkey_for_each_ptr_decode(k.k, ptrs, p, entry)
43                         if (!p.ptr.cached &&
44                             !bch2_dev_in_target(c, p.ptr.dev, io_opts->background_target))
45                                 return p.ptr.dev;
46
47         return -1;
48 }
49
50 void bch2_rebalance_add_key(struct bch_fs *c,
51                             struct bkey_s_c k,
52                             struct bch_io_opts *io_opts)
53 {
54         atomic64_t *counter;
55         int dev;
56
57         dev = __bch2_rebalance_pred(c, k, io_opts);
58         if (dev < 0)
59                 return;
60
61         counter = dev < INT_MAX
62                 ? &bch_dev_bkey_exists(c, dev)->rebalance_work
63                 : &c->rebalance.work_unknown_dev;
64
65         if (atomic64_add_return(k.k->size, counter) == k.k->size)
66                 rebalance_wakeup(c);
67 }
68
69 static enum data_cmd rebalance_pred(struct bch_fs *c, void *arg,
70                                     struct bkey_s_c k,
71                                     struct bch_io_opts *io_opts,
72                                     struct data_opts *data_opts)
73 {
74         if (__bch2_rebalance_pred(c, k, io_opts) >= 0) {
75                 data_opts->target               = io_opts->background_target;
76                 data_opts->btree_insert_flags   = 0;
77                 return DATA_ADD_REPLICAS;
78         } else {
79                 return DATA_SKIP;
80         }
81 }
82
83 void bch2_rebalance_add_work(struct bch_fs *c, u64 sectors)
84 {
85         if (atomic64_add_return(sectors, &c->rebalance.work_unknown_dev) ==
86             sectors)
87                 rebalance_wakeup(c);
88 }
89
90 struct rebalance_work {
91         int             dev_most_full_idx;
92         unsigned        dev_most_full_percent;
93         u64             dev_most_full_work;
94         u64             dev_most_full_capacity;
95         u64             total_work;
96 };
97
98 static void rebalance_work_accumulate(struct rebalance_work *w,
99                 u64 dev_work, u64 unknown_dev, u64 capacity, int idx)
100 {
101         unsigned percent_full;
102         u64 work = dev_work + unknown_dev;
103
104         if (work < dev_work || work < unknown_dev)
105                 work = U64_MAX;
106         work = min(work, capacity);
107
108         percent_full = div64_u64(work * 100, capacity);
109
110         if (percent_full >= w->dev_most_full_percent) {
111                 w->dev_most_full_idx            = idx;
112                 w->dev_most_full_percent        = percent_full;
113                 w->dev_most_full_work           = work;
114                 w->dev_most_full_capacity       = capacity;
115         }
116
117         if (w->total_work + dev_work >= w->total_work &&
118             w->total_work + dev_work >= dev_work)
119                 w->total_work += dev_work;
120 }
121
122 static struct rebalance_work rebalance_work(struct bch_fs *c)
123 {
124         struct bch_dev *ca;
125         struct rebalance_work ret = { .dev_most_full_idx = -1 };
126         u64 unknown_dev = atomic64_read(&c->rebalance.work_unknown_dev);
127         unsigned i;
128
129         for_each_online_member(ca, c, i)
130                 rebalance_work_accumulate(&ret,
131                         atomic64_read(&ca->rebalance_work),
132                         unknown_dev,
133                         bucket_to_sector(ca, ca->mi.nbuckets -
134                                          ca->mi.first_bucket),
135                         i);
136
137         rebalance_work_accumulate(&ret,
138                 unknown_dev, 0, c->capacity, -1);
139
140         return ret;
141 }
142
143 static void rebalance_work_reset(struct bch_fs *c)
144 {
145         struct bch_dev *ca;
146         unsigned i;
147
148         for_each_online_member(ca, c, i)
149                 atomic64_set(&ca->rebalance_work, 0);
150
151         atomic64_set(&c->rebalance.work_unknown_dev, 0);
152 }
153
154 static unsigned long curr_cputime(void)
155 {
156         u64 utime, stime;
157
158         task_cputime_adjusted(current, &utime, &stime);
159         return nsecs_to_jiffies(utime + stime);
160 }
161
162 static int bch2_rebalance_thread(void *arg)
163 {
164         struct bch_fs *c = arg;
165         struct bch_fs_rebalance *r = &c->rebalance;
166         struct io_clock *clock = &c->io_clock[WRITE];
167         struct rebalance_work w, p;
168         unsigned long start, prev_start;
169         unsigned long prev_run_time, prev_run_cputime;
170         unsigned long cputime, prev_cputime;
171         unsigned long io_start;
172         long throttle;
173
174         set_freezable();
175
176         io_start        = atomic_long_read(&clock->now);
177         p               = rebalance_work(c);
178         prev_start      = jiffies;
179         prev_cputime    = curr_cputime();
180
181         while (!kthread_wait_freezable(r->enabled)) {
182                 cond_resched();
183
184                 start                   = jiffies;
185                 cputime                 = curr_cputime();
186
187                 prev_run_time           = start - prev_start;
188                 prev_run_cputime        = cputime - prev_cputime;
189
190                 w                       = rebalance_work(c);
191                 BUG_ON(!w.dev_most_full_capacity);
192
193                 if (!w.total_work) {
194                         r->state = REBALANCE_WAITING;
195                         kthread_wait_freezable(rebalance_work(c).total_work);
196                         continue;
197                 }
198
199                 /*
200                  * If there isn't much work to do, throttle cpu usage:
201                  */
202                 throttle = prev_run_cputime * 100 /
203                         max(1U, w.dev_most_full_percent) -
204                         prev_run_time;
205
206                 if (w.dev_most_full_percent < 20 && throttle > 0) {
207                         r->state = REBALANCE_THROTTLED;
208                         r->throttled_until_iotime = io_start +
209                                 div_u64(w.dev_most_full_capacity *
210                                         (20 - w.dev_most_full_percent),
211                                         50);
212                         r->throttled_until_cputime = start + throttle;
213
214                         bch2_kthread_io_clock_wait(clock,
215                                 r->throttled_until_iotime,
216                                 throttle);
217                         continue;
218                 }
219
220                 /* minimum 1 mb/sec: */
221                 r->pd.rate.rate =
222                         max_t(u64, 1 << 11,
223                               r->pd.rate.rate *
224                               max(p.dev_most_full_percent, 1U) /
225                               max(w.dev_most_full_percent, 1U));
226
227                 io_start        = atomic_long_read(&clock->now);
228                 p               = w;
229                 prev_start      = start;
230                 prev_cputime    = cputime;
231
232                 r->state = REBALANCE_RUNNING;
233                 memset(&r->move_stats, 0, sizeof(r->move_stats));
234                 rebalance_work_reset(c);
235
236                 bch2_move_data(c,
237                                /* ratelimiting disabled for now */
238                                NULL, /*  &r->pd.rate, */
239                                writepoint_ptr(&c->rebalance_write_point),
240                                POS_MIN, POS_MAX,
241                                rebalance_pred, NULL,
242                                &r->move_stats);
243         }
244
245         return 0;
246 }
247
248 ssize_t bch2_rebalance_work_show(struct bch_fs *c, char *buf)
249 {
250         struct printbuf out = _PBUF(buf, PAGE_SIZE);
251         struct bch_fs_rebalance *r = &c->rebalance;
252         struct rebalance_work w = rebalance_work(c);
253         char h1[21], h2[21];
254
255         bch2_hprint(&PBUF(h1), w.dev_most_full_work << 9);
256         bch2_hprint(&PBUF(h2), w.dev_most_full_capacity << 9);
257         pr_buf(&out, "fullest_dev (%i):\t%s/%s\n",
258                w.dev_most_full_idx, h1, h2);
259
260         bch2_hprint(&PBUF(h1), w.total_work << 9);
261         bch2_hprint(&PBUF(h2), c->capacity << 9);
262         pr_buf(&out, "total work:\t\t%s/%s\n", h1, h2);
263
264         pr_buf(&out, "rate:\t\t\t%u\n", r->pd.rate.rate);
265
266         switch (r->state) {
267         case REBALANCE_WAITING:
268                 pr_buf(&out, "waiting\n");
269                 break;
270         case REBALANCE_THROTTLED:
271                 bch2_hprint(&PBUF(h1),
272                             (r->throttled_until_iotime -
273                              atomic_long_read(&c->io_clock[WRITE].now)) << 9);
274                 pr_buf(&out, "throttled for %lu sec or %s io\n",
275                        (r->throttled_until_cputime - jiffies) / HZ,
276                        h1);
277                 break;
278         case REBALANCE_RUNNING:
279                 pr_buf(&out, "running\n");
280                 pr_buf(&out, "pos %llu:%llu\n",
281                        r->move_stats.pos.inode,
282                        r->move_stats.pos.offset);
283                 break;
284         }
285
286         return out.pos - buf;
287 }
288
289 void bch2_rebalance_stop(struct bch_fs *c)
290 {
291         struct task_struct *p;
292
293         c->rebalance.pd.rate.rate = UINT_MAX;
294         bch2_ratelimit_reset(&c->rebalance.pd.rate);
295
296         p = rcu_dereference_protected(c->rebalance.thread, 1);
297         c->rebalance.thread = NULL;
298
299         if (p) {
300                 /* for sychronizing with rebalance_wakeup() */
301                 synchronize_rcu();
302
303                 kthread_stop(p);
304                 put_task_struct(p);
305         }
306 }
307
308 int bch2_rebalance_start(struct bch_fs *c)
309 {
310         struct task_struct *p;
311
312         if (c->opts.nochanges)
313                 return 0;
314
315         p = kthread_create(bch2_rebalance_thread, c, "bch_rebalance");
316         if (IS_ERR(p))
317                 return PTR_ERR(p);
318
319         get_task_struct(p);
320         rcu_assign_pointer(c->rebalance.thread, p);
321         wake_up_process(p);
322         return 0;
323 }
324
325 void bch2_fs_rebalance_init(struct bch_fs *c)
326 {
327         bch2_pd_controller_init(&c->rebalance.pd);
328
329         atomic64_set(&c->rebalance.work_unknown_dev, S64_MAX);
330 }