]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcache/alloc.c
Delete more unused shim code, update bcache code
[bcachefs-tools-debian] / libbcache / alloc.c
1 /*
2  * Primary bucket allocation code
3  *
4  * Copyright 2012 Google, Inc.
5  *
6  * Allocation in bcache is done in terms of buckets:
7  *
8  * Each bucket has associated an 8 bit gen; this gen corresponds to the gen in
9  * btree pointers - they must match for the pointer to be considered valid.
10  *
11  * Thus (assuming a bucket has no dirty data or metadata in it) we can reuse a
12  * bucket simply by incrementing its gen.
13  *
14  * The gens (along with the priorities; it's really the gens are important but
15  * the code is named as if it's the priorities) are written in an arbitrary list
16  * of buckets on disk, with a pointer to them in the journal header.
17  *
18  * When we invalidate a bucket, we have to write its new gen to disk and wait
19  * for that write to complete before we use it - otherwise after a crash we
20  * could have pointers that appeared to be good but pointed to data that had
21  * been overwritten.
22  *
23  * Since the gens and priorities are all stored contiguously on disk, we can
24  * batch this up: We fill up the free_inc list with freshly invalidated buckets,
25  * call prio_write(), and when prio_write() finishes we pull buckets off the
26  * free_inc list and optionally discard them.
27  *
28  * free_inc isn't the only freelist - if it was, we'd often have to sleep while
29  * priorities and gens were being written before we could allocate. c->free is a
30  * smaller freelist, and buckets on that list are always ready to be used.
31  *
32  * If we've got discards enabled, that happens when a bucket moves from the
33  * free_inc list to the free list.
34  *
35  * It's important to ensure that gens don't wrap around - with respect to
36  * either the oldest gen in the btree or the gen on disk. This is quite
37  * difficult to do in practice, but we explicitly guard against it anyways - if
38  * a bucket is in danger of wrapping around we simply skip invalidating it that
39  * time around, and we garbage collect or rewrite the priorities sooner than we
40  * would have otherwise.
41  *
42  * bch_bucket_alloc() allocates a single bucket from a specific cache.
43  *
44  * bch_bucket_alloc_set() allocates one or more buckets from different caches
45  * out of a cache set.
46  *
47  * invalidate_buckets() drives all the processes described above. It's called
48  * from bch_bucket_alloc() and a few other places that need to make sure free
49  * buckets are ready.
50  *
51  * invalidate_buckets_(lru|fifo)() find buckets that are available to be
52  * invalidated, and then invalidate them and stick them on the free_inc list -
53  * in either lru or fifo order.
54  */
55
56 #include "bcache.h"
57 #include "alloc.h"
58 #include "btree_update.h"
59 #include "buckets.h"
60 #include "checksum.h"
61 #include "clock.h"
62 #include "debug.h"
63 #include "error.h"
64 #include "extents.h"
65 #include "io.h"
66 #include "journal.h"
67 #include "super-io.h"
68
69 #include <linux/blkdev.h>
70 #include <linux/kthread.h>
71 #include <linux/math64.h>
72 #include <linux/random.h>
73 #include <linux/rcupdate.h>
74 #include <trace/events/bcache.h>
75
76 static size_t bch_bucket_alloc(struct cache *, enum alloc_reserve);
77 static void __bch_bucket_free(struct cache *, struct bucket *);
78
79 /* Allocation groups: */
80
81 void bch_dev_group_remove(struct cache_group *grp, struct cache *ca)
82 {
83         unsigned i;
84
85         spin_lock(&grp->lock);
86
87         for (i = 0; i < grp->nr_devices; i++)
88                 if (rcu_access_pointer(grp->d[i].dev) == ca) {
89                         grp->nr_devices--;
90                         memmove(&grp->d[i],
91                                 &grp->d[i + 1],
92                                 (grp->nr_devices - i) * sizeof(grp->d[0]));
93                         break;
94                 }
95
96         spin_unlock(&grp->lock);
97 }
98
99 void bch_dev_group_add(struct cache_group *grp, struct cache *ca)
100 {
101         unsigned i;
102
103         spin_lock(&grp->lock);
104         for (i = 0; i < grp->nr_devices; i++)
105                 if (rcu_access_pointer(grp->d[i].dev) == ca)
106                         goto out;
107
108         BUG_ON(grp->nr_devices >= BCH_SB_MEMBERS_MAX);
109
110         rcu_assign_pointer(grp->d[grp->nr_devices++].dev, ca);
111 out:
112         spin_unlock(&grp->lock);
113 }
114
115 /* Ratelimiting/PD controllers */
116
117 static void pd_controllers_update(struct work_struct *work)
118 {
119         struct cache_set *c = container_of(to_delayed_work(work),
120                                            struct cache_set,
121                                            pd_controllers_update);
122         struct cache *ca;
123         unsigned iter;
124         int i;
125
126         /* All units are in bytes */
127         u64 tier_size[BCH_TIER_MAX];
128         u64 tier_free[BCH_TIER_MAX];
129         u64 tier_dirty[BCH_TIER_MAX];
130         u64 tier0_can_free = 0;
131
132         memset(tier_size, 0, sizeof(tier_size));
133         memset(tier_free, 0, sizeof(tier_free));
134         memset(tier_dirty, 0, sizeof(tier_dirty));
135
136         rcu_read_lock();
137         for (i = BCH_TIER_MAX - 1; i >= 0; --i)
138                 group_for_each_cache_rcu(ca, &c->cache_tiers[i], iter) {
139                         struct bucket_stats_cache stats = bch_bucket_stats_read_cache(ca);
140                         unsigned bucket_bits = ca->bucket_bits + 9;
141
142                         /*
143                          * Bytes of internal fragmentation, which can be
144                          * reclaimed by copy GC
145                          */
146                         s64 fragmented = ((stats.buckets_dirty +
147                                            stats.buckets_cached) <<
148                                           bucket_bits) -
149                                 ((stats.sectors_dirty +
150                                   stats.sectors_cached) << 9);
151
152                         u64 dev_size = (ca->mi.nbuckets -
153                                         ca->mi.first_bucket) << bucket_bits;
154
155                         u64 free = __buckets_free_cache(ca, stats) << bucket_bits;
156
157                         if (fragmented < 0)
158                                 fragmented = 0;
159
160                         bch_pd_controller_update(&ca->moving_gc_pd,
161                                                  free, fragmented, -1);
162
163                         if (i == 0)
164                                 tier0_can_free += fragmented;
165
166                         tier_size[i] += dev_size;
167                         tier_free[i] += free;
168                         tier_dirty[i] += stats.buckets_dirty << bucket_bits;
169                 }
170         rcu_read_unlock();
171
172         if (tier_size[1]) {
173                 u64 target = div_u64(tier_size[0] * c->tiering_percent, 100);
174
175                 tier0_can_free = max_t(s64, 0, tier_dirty[0] - target);
176
177                 bch_pd_controller_update(&c->tiering_pd,
178                                          target,
179                                          tier_dirty[0],
180                                          -1);
181         }
182
183         /*
184          * Throttle foreground writes if tier 0 is running out of free buckets,
185          * and either tiering or copygc can free up space (but don't take both
186          * into account).
187          *
188          * Target will be small if there isn't any work to do - we don't want to
189          * throttle foreground writes if we currently have all the free space
190          * we're ever going to have.
191          *
192          * Otherwise, if there's work to do, try to keep 20% of tier0 available
193          * for foreground writes.
194          */
195         bch_pd_controller_update(&c->foreground_write_pd,
196                                  min(tier0_can_free,
197                                      div_u64(tier_size[0] *
198                                              c->foreground_target_percent,
199                                              100)),
200                                  tier_free[0],
201                                  -1);
202
203         schedule_delayed_work(&c->pd_controllers_update,
204                               c->pd_controllers_update_seconds * HZ);
205 }
206
207 /*
208  * Bucket priorities/gens:
209  *
210  * For each bucket, we store on disk its
211    * 8 bit gen
212    * 16 bit priority
213  *
214  * See alloc.c for an explanation of the gen. The priority is used to implement
215  * lru (and in the future other) cache replacement policies; for most purposes
216  * it's just an opaque integer.
217  *
218  * The gens and the priorities don't have a whole lot to do with each other, and
219  * it's actually the gens that must be written out at specific times - it's no
220  * big deal if the priorities don't get written, if we lose them we just reuse
221  * buckets in suboptimal order.
222  *
223  * On disk they're stored in a packed array, and in as many buckets are required
224  * to fit them all. The buckets we use to store them form a list; the journal
225  * header points to the first bucket, the first bucket points to the second
226  * bucket, et cetera.
227  *
228  * This code is used by the allocation code; periodically (whenever it runs out
229  * of buckets to allocate from) the allocation code will invalidate some
230  * buckets, but it can't use those buckets until their new gens are safely on
231  * disk.
232  */
233
234 static int prio_io(struct cache *ca, uint64_t bucket, int op)
235 {
236         bio_init(ca->bio_prio);
237         bio_set_op_attrs(ca->bio_prio, op, REQ_SYNC|REQ_META);
238
239         ca->bio_prio->bi_max_vecs       = bucket_pages(ca);
240         ca->bio_prio->bi_io_vec         = ca->bio_prio->bi_inline_vecs;
241         ca->bio_prio->bi_iter.bi_sector = bucket * ca->mi.bucket_size;
242         ca->bio_prio->bi_bdev           = ca->disk_sb.bdev;
243         ca->bio_prio->bi_iter.bi_size   = bucket_bytes(ca);
244         bch_bio_map(ca->bio_prio, ca->disk_buckets);
245
246         return submit_bio_wait(ca->bio_prio);
247 }
248
249 static struct nonce prio_nonce(struct prio_set *p)
250 {
251         return (struct nonce) {{
252                 [0] = 0,
253                 [1] = p->nonce[0],
254                 [2] = p->nonce[1],
255                 [3] = p->nonce[2]^BCH_NONCE_PRIO,
256         }};
257 }
258
259 static int bch_prio_write(struct cache *ca)
260 {
261         struct cache_set *c = ca->set;
262         struct journal *j = &c->journal;
263         struct journal_res res = { 0 };
264         bool need_new_journal_entry;
265         int i, ret;
266
267         if (c->opts.nochanges)
268                 return 0;
269
270         trace_bcache_prio_write_start(ca);
271
272         atomic64_add(ca->mi.bucket_size * prio_buckets(ca),
273                      &ca->meta_sectors_written);
274
275         for (i = prio_buckets(ca) - 1; i >= 0; --i) {
276                 struct bucket *g;
277                 struct prio_set *p = ca->disk_buckets;
278                 struct bucket_disk *d = p->data;
279                 struct bucket_disk *end = d + prios_per_bucket(ca);
280                 size_t r;
281
282                 for (r = i * prios_per_bucket(ca);
283                      r < ca->mi.nbuckets && d < end;
284                      r++, d++) {
285                         g = ca->buckets + r;
286                         d->read_prio = cpu_to_le16(g->read_prio);
287                         d->write_prio = cpu_to_le16(g->write_prio);
288                         d->gen = ca->buckets[r].mark.gen;
289                 }
290
291                 p->next_bucket  = cpu_to_le64(ca->prio_buckets[i + 1]);
292                 p->magic        = cpu_to_le64(pset_magic(c));
293                 get_random_bytes(&p->nonce, sizeof(p->nonce));
294
295                 spin_lock(&ca->prio_buckets_lock);
296                 r = bch_bucket_alloc(ca, RESERVE_PRIO);
297                 BUG_ON(!r);
298
299                 /*
300                  * goes here before dropping prio_buckets_lock to guard against
301                  * it getting gc'd from under us
302                  */
303                 ca->prio_buckets[i] = r;
304                 bch_mark_metadata_bucket(ca, ca->buckets + r, false);
305                 spin_unlock(&ca->prio_buckets_lock);
306
307                 SET_PSET_CSUM_TYPE(p, bch_meta_checksum_type(c));
308
309                 bch_encrypt(c, PSET_CSUM_TYPE(p),
310                             prio_nonce(p),
311                             p->encrypted_start,
312                             bucket_bytes(ca) -
313                             offsetof(struct prio_set, encrypted_start));
314
315                 p->csum  = bch_checksum(c, PSET_CSUM_TYPE(p),
316                                         prio_nonce(p),
317                                         (void *) p + sizeof(p->csum),
318                                         bucket_bytes(ca) - sizeof(p->csum));
319
320                 ret = prio_io(ca, r, REQ_OP_WRITE);
321                 if (bch_dev_fatal_io_err_on(ret, ca,
322                                           "prio write to bucket %zu", r) ||
323                     bch_meta_write_fault("prio"))
324                         return ret;
325         }
326
327         spin_lock(&j->lock);
328         j->prio_buckets[ca->dev_idx] = cpu_to_le64(ca->prio_buckets[0]);
329         j->nr_prio_buckets = max_t(unsigned,
330                                    ca->dev_idx + 1,
331                                    j->nr_prio_buckets);
332         spin_unlock(&j->lock);
333
334         do {
335                 unsigned u64s = jset_u64s(0);
336
337                 ret = bch_journal_res_get(j, &res, u64s, u64s);
338                 if (ret)
339                         return ret;
340
341                 need_new_journal_entry = j->buf[res.idx].nr_prio_buckets <
342                         ca->dev_idx + 1;
343                 bch_journal_res_put(j, &res);
344
345                 ret = bch_journal_flush_seq(j, res.seq);
346                 if (ret)
347                         return ret;
348         } while (need_new_journal_entry);
349
350         /*
351          * Don't want the old priorities to get garbage collected until after we
352          * finish writing the new ones, and they're journalled
353          */
354
355         spin_lock(&ca->prio_buckets_lock);
356
357         for (i = 0; i < prio_buckets(ca); i++) {
358                 if (ca->prio_last_buckets[i])
359                         __bch_bucket_free(ca,
360                                 &ca->buckets[ca->prio_last_buckets[i]]);
361
362                 ca->prio_last_buckets[i] = ca->prio_buckets[i];
363         }
364
365         spin_unlock(&ca->prio_buckets_lock);
366
367         trace_bcache_prio_write_end(ca);
368         return 0;
369 }
370
371 int bch_prio_read(struct cache *ca)
372 {
373         struct cache_set *c = ca->set;
374         struct prio_set *p = ca->disk_buckets;
375         struct bucket_disk *d = p->data + prios_per_bucket(ca), *end = d;
376         struct bucket_mark new;
377         struct bch_csum csum;
378         unsigned bucket_nr = 0;
379         u64 bucket, expect, got;
380         size_t b;
381         int ret = 0;
382
383         spin_lock(&c->journal.lock);
384         bucket = le64_to_cpu(c->journal.prio_buckets[ca->dev_idx]);
385         spin_unlock(&c->journal.lock);
386
387         /*
388          * If the device hasn't been used yet, there won't be a prio bucket ptr
389          */
390         if (!bucket)
391                 return 0;
392
393         unfixable_fsck_err_on(bucket < ca->mi.first_bucket ||
394                               bucket >= ca->mi.nbuckets, c,
395                               "bad prio bucket %llu", bucket);
396
397         for (b = 0; b < ca->mi.nbuckets; b++, d++) {
398                 if (d == end) {
399                         ca->prio_last_buckets[bucket_nr] = bucket;
400                         bucket_nr++;
401
402                         ret = prio_io(ca, bucket, REQ_OP_READ);
403                         if (bch_dev_fatal_io_err_on(ret, ca,
404                                         "prior read from bucket %llu",
405                                         bucket) ||
406                             bch_meta_read_fault("prio"))
407                                 return -EIO;
408
409                         got = le64_to_cpu(p->magic);
410                         expect = pset_magic(c);
411                         unfixable_fsck_err_on(got != expect, c,
412                                 "bad magic (got %llu expect %llu) while reading prios from bucket %llu",
413                                 got, expect, bucket);
414
415                         unfixable_fsck_err_on(PSET_CSUM_TYPE(p) >= BCH_CSUM_NR, c,
416                                 "prio bucket with unknown csum type %llu bucket %lluu",
417                                 PSET_CSUM_TYPE(p), bucket);
418
419                         csum = bch_checksum(c, PSET_CSUM_TYPE(p),
420                                             prio_nonce(p),
421                                             (void *) p + sizeof(p->csum),
422                                             bucket_bytes(ca) - sizeof(p->csum));
423                         unfixable_fsck_err_on(bch_crc_cmp(csum, p->csum), c,
424                                 "bad checksum reading prios from bucket %llu",
425                                 bucket);
426
427                         bch_encrypt(c, PSET_CSUM_TYPE(p),
428                                     prio_nonce(p),
429                                     p->encrypted_start,
430                                     bucket_bytes(ca) -
431                                     offsetof(struct prio_set, encrypted_start));
432
433                         bucket = le64_to_cpu(p->next_bucket);
434                         d = p->data;
435                 }
436
437                 ca->buckets[b].read_prio = le16_to_cpu(d->read_prio);
438                 ca->buckets[b].write_prio = le16_to_cpu(d->write_prio);
439
440                 bucket_cmpxchg(&ca->buckets[b], new, new.gen = d->gen);
441         }
442 fsck_err:
443         return 0;
444 }
445
446 #define BUCKET_GC_GEN_MAX       96U
447
448 /**
449  * wait_buckets_available - wait on reclaimable buckets
450  *
451  * If there aren't enough available buckets to fill up free_inc, wait until
452  * there are.
453  */
454 static int wait_buckets_available(struct cache *ca)
455 {
456         struct cache_set *c = ca->set;
457         int ret = 0;
458
459         while (1) {
460                 set_current_state(TASK_INTERRUPTIBLE);
461                 if (kthread_should_stop()) {
462                         ret = -1;
463                         break;
464                 }
465
466                 if (ca->inc_gen_needs_gc >= fifo_free(&ca->free_inc)) {
467                         if (c->gc_thread) {
468                                 trace_bcache_gc_cannot_inc_gens(ca->set);
469                                 atomic_inc(&c->kick_gc);
470                                 wake_up_process(ca->set->gc_thread);
471                         }
472
473                         /*
474                          * We are going to wait for GC to wake us up, even if
475                          * bucket counters tell us enough buckets are available,
476                          * because we are actually waiting for GC to rewrite
477                          * nodes with stale pointers
478                          */
479                 } else if (buckets_available_cache(ca) >=
480                            fifo_free(&ca->free_inc))
481                         break;
482
483                 up_read(&ca->set->gc_lock);
484                 schedule();
485                 try_to_freeze();
486                 down_read(&ca->set->gc_lock);
487         }
488
489         __set_current_state(TASK_RUNNING);
490         return ret;
491 }
492
493 static void verify_not_on_freelist(struct cache *ca, size_t bucket)
494 {
495         if (expensive_debug_checks(ca->set)) {
496                 size_t iter;
497                 long i;
498                 unsigned j;
499
500                 for (iter = 0; iter < prio_buckets(ca) * 2; iter++)
501                         BUG_ON(ca->prio_buckets[iter] == bucket);
502
503                 for (j = 0; j < RESERVE_NR; j++)
504                         fifo_for_each_entry(i, &ca->free[j], iter)
505                                 BUG_ON(i == bucket);
506                 fifo_for_each_entry(i, &ca->free_inc, iter)
507                         BUG_ON(i == bucket);
508         }
509 }
510
511 /* Bucket heap / gen */
512
513 void bch_recalc_min_prio(struct cache *ca, int rw)
514 {
515         struct cache_set *c = ca->set;
516         struct prio_clock *clock = &c->prio_clock[rw];
517         struct bucket *g;
518         u16 max_delta = 1;
519         unsigned i;
520
521         /* Determine min prio for this particular cache */
522         for_each_bucket(g, ca)
523                 max_delta = max(max_delta, (u16) (clock->hand - g->prio[rw]));
524
525         ca->min_prio[rw] = clock->hand - max_delta;
526
527         /*
528          * This may possibly increase the min prio for the whole cache, check
529          * that as well.
530          */
531         max_delta = 1;
532
533         for_each_cache(ca, c, i)
534                 max_delta = max(max_delta,
535                                 (u16) (clock->hand - ca->min_prio[rw]));
536
537         clock->min_prio = clock->hand - max_delta;
538 }
539
540 static void bch_rescale_prios(struct cache_set *c, int rw)
541 {
542         struct prio_clock *clock = &c->prio_clock[rw];
543         struct cache *ca;
544         struct bucket *g;
545         unsigned i;
546
547         trace_bcache_rescale_prios(c);
548
549         for_each_cache(ca, c, i) {
550                 for_each_bucket(g, ca)
551                         g->prio[rw] = clock->hand -
552                                 (clock->hand - g->prio[rw]) / 2;
553
554                 bch_recalc_min_prio(ca, rw);
555         }
556 }
557
558 static void bch_inc_clock_hand(struct io_timer *timer)
559 {
560         struct prio_clock *clock = container_of(timer,
561                                         struct prio_clock, rescale);
562         struct cache_set *c = container_of(clock,
563                                 struct cache_set, prio_clock[clock->rw]);
564         u64 capacity;
565
566         mutex_lock(&c->bucket_lock);
567
568         clock->hand++;
569
570         /* if clock cannot be advanced more, rescale prio */
571         if (clock->hand == (u16) (clock->min_prio - 1))
572                 bch_rescale_prios(c, clock->rw);
573
574         mutex_unlock(&c->bucket_lock);
575
576         capacity = READ_ONCE(c->capacity);
577
578         if (!capacity)
579                 return;
580
581         /*
582          * we only increment when 0.1% of the cache_set has been read
583          * or written too, this determines if it's time
584          *
585          * XXX: we shouldn't really be going off of the capacity of devices in
586          * RW mode (that will be 0 when we're RO, yet we can still service
587          * reads)
588          */
589         timer->expire += capacity >> 10;
590
591         bch_io_timer_add(&c->io_clock[clock->rw], timer);
592 }
593
594 static void bch_prio_timer_init(struct cache_set *c, int rw)
595 {
596         struct prio_clock *clock = &c->prio_clock[rw];
597         struct io_timer *timer = &clock->rescale;
598
599         clock->rw       = rw;
600         timer->fn       = bch_inc_clock_hand;
601         timer->expire   = c->capacity >> 10;
602 }
603
604 /*
605  * Background allocation thread: scans for buckets to be invalidated,
606  * invalidates them, rewrites prios/gens (marking them as invalidated on disk),
607  * then optionally issues discard commands to the newly free buckets, then puts
608  * them on the various freelists.
609  */
610
611 static inline bool can_inc_bucket_gen(struct cache *ca, struct bucket *g)
612 {
613         return bucket_gc_gen(ca, g) < BUCKET_GC_GEN_MAX;
614 }
615
616 static bool bch_can_invalidate_bucket(struct cache *ca, struct bucket *g)
617 {
618         if (!is_available_bucket(READ_ONCE(g->mark)))
619                 return false;
620
621         if (bucket_gc_gen(ca, g) >= BUCKET_GC_GEN_MAX - 1)
622                 ca->inc_gen_needs_gc++;
623
624         return can_inc_bucket_gen(ca, g);
625 }
626
627 static void bch_invalidate_one_bucket(struct cache *ca, struct bucket *g)
628 {
629         spin_lock(&ca->freelist_lock);
630
631         bch_invalidate_bucket(ca, g);
632
633         g->read_prio = ca->set->prio_clock[READ].hand;
634         g->write_prio = ca->set->prio_clock[WRITE].hand;
635
636         verify_not_on_freelist(ca, g - ca->buckets);
637         BUG_ON(!fifo_push(&ca->free_inc, g - ca->buckets));
638
639         spin_unlock(&ca->freelist_lock);
640 }
641
642 /*
643  * Determines what order we're going to reuse buckets, smallest bucket_key()
644  * first.
645  *
646  *
647  * - We take into account the read prio of the bucket, which gives us an
648  *   indication of how hot the data is -- we scale the prio so that the prio
649  *   farthest from the clock is worth 1/8th of the closest.
650  *
651  * - The number of sectors of cached data in the bucket, which gives us an
652  *   indication of the cost in cache misses this eviction will cause.
653  *
654  * - The difference between the bucket's current gen and oldest gen of any
655  *   pointer into it, which gives us an indication of the cost of an eventual
656  *   btree GC to rewrite nodes with stale pointers.
657  */
658
659 #define bucket_sort_key(g)                                              \
660 ({                                                                      \
661         unsigned long prio = g->read_prio - ca->min_prio[READ];         \
662         prio = (prio * 7) / (ca->set->prio_clock[READ].hand -           \
663                              ca->min_prio[READ]);                       \
664                                                                         \
665         (((prio + 1) * bucket_sectors_used(g)) << 8) | bucket_gc_gen(ca, g);\
666 })
667
668 static void invalidate_buckets_lru(struct cache *ca)
669 {
670         struct bucket_heap_entry e;
671         struct bucket *g;
672         unsigned i;
673
674         mutex_lock(&ca->heap_lock);
675
676         ca->heap.used = 0;
677
678         mutex_lock(&ca->set->bucket_lock);
679         bch_recalc_min_prio(ca, READ);
680         bch_recalc_min_prio(ca, WRITE);
681
682         /*
683          * Find buckets with lowest read priority, by building a maxheap sorted
684          * by read priority and repeatedly replacing the maximum element until
685          * all buckets have been visited.
686          */
687         for_each_bucket(g, ca) {
688                 if (!bch_can_invalidate_bucket(ca, g))
689                         continue;
690
691                 bucket_heap_push(ca, g, bucket_sort_key(g));
692         }
693
694         /* Sort buckets by physical location on disk for better locality */
695         for (i = 0; i < ca->heap.used; i++) {
696                 struct bucket_heap_entry *e = &ca->heap.data[i];
697
698                 e->val = e->g - ca->buckets;
699         }
700
701         heap_resort(&ca->heap, bucket_max_cmp);
702
703         /*
704          * If we run out of buckets to invalidate, bch_allocator_thread() will
705          * kick stuff and retry us
706          */
707         while (!fifo_full(&ca->free_inc) &&
708                heap_pop(&ca->heap, e, bucket_max_cmp)) {
709                 BUG_ON(!bch_can_invalidate_bucket(ca, e.g));
710                 bch_invalidate_one_bucket(ca, e.g);
711         }
712
713         mutex_unlock(&ca->set->bucket_lock);
714         mutex_unlock(&ca->heap_lock);
715 }
716
717 static void invalidate_buckets_fifo(struct cache *ca)
718 {
719         struct bucket *g;
720         size_t checked = 0;
721
722         while (!fifo_full(&ca->free_inc)) {
723                 if (ca->fifo_last_bucket <  ca->mi.first_bucket ||
724                     ca->fifo_last_bucket >= ca->mi.nbuckets)
725                         ca->fifo_last_bucket = ca->mi.first_bucket;
726
727                 g = ca->buckets + ca->fifo_last_bucket++;
728
729                 if (bch_can_invalidate_bucket(ca, g))
730                         bch_invalidate_one_bucket(ca, g);
731
732                 if (++checked >= ca->mi.nbuckets)
733                         return;
734         }
735 }
736
737 static void invalidate_buckets_random(struct cache *ca)
738 {
739         struct bucket *g;
740         size_t checked = 0;
741
742         while (!fifo_full(&ca->free_inc)) {
743                 size_t n = bch_rand_range(ca->mi.nbuckets -
744                                           ca->mi.first_bucket) +
745                         ca->mi.first_bucket;
746
747                 g = ca->buckets + n;
748
749                 if (bch_can_invalidate_bucket(ca, g))
750                         bch_invalidate_one_bucket(ca, g);
751
752                 if (++checked >= ca->mi.nbuckets / 2)
753                         return;
754         }
755 }
756
757 static void invalidate_buckets(struct cache *ca)
758 {
759         ca->inc_gen_needs_gc = 0;
760
761         switch (ca->mi.replacement) {
762         case CACHE_REPLACEMENT_LRU:
763                 invalidate_buckets_lru(ca);
764                 break;
765         case CACHE_REPLACEMENT_FIFO:
766                 invalidate_buckets_fifo(ca);
767                 break;
768         case CACHE_REPLACEMENT_RANDOM:
769                 invalidate_buckets_random(ca);
770                 break;
771         }
772 }
773
774 static bool __bch_allocator_push(struct cache *ca, long bucket)
775 {
776         if (fifo_push(&ca->free[RESERVE_PRIO], bucket))
777                 goto success;
778
779         if (fifo_push(&ca->free[RESERVE_MOVINGGC], bucket))
780                 goto success;
781
782         if (fifo_push(&ca->free[RESERVE_BTREE], bucket))
783                 goto success;
784
785         if (fifo_push(&ca->free[RESERVE_NONE], bucket))
786                 goto success;
787
788         return false;
789 success:
790         closure_wake_up(&ca->set->freelist_wait);
791         return true;
792 }
793
794 static bool bch_allocator_push(struct cache *ca, long bucket)
795 {
796         bool ret;
797
798         spin_lock(&ca->freelist_lock);
799         ret = __bch_allocator_push(ca, bucket);
800         if (ret)
801                 fifo_pop(&ca->free_inc, bucket);
802         spin_unlock(&ca->freelist_lock);
803
804         return ret;
805 }
806
807 static void bch_find_empty_buckets(struct cache_set *c, struct cache *ca)
808 {
809         u16 last_seq_ondisk = c->journal.last_seq_ondisk;
810         struct bucket *g;
811
812         for_each_bucket(g, ca) {
813                 struct bucket_mark m = READ_ONCE(g->mark);
814
815                 if (is_available_bucket(m) &&
816                     !m.cached_sectors &&
817                     !m.had_metadata &&
818                     (!m.wait_on_journal ||
819                      ((s16) last_seq_ondisk - (s16) m.journal_seq >= 0))) {
820                         spin_lock(&ca->freelist_lock);
821
822                         bch_mark_alloc_bucket(ca, g, true);
823                         g->read_prio = ca->set->prio_clock[READ].hand;
824                         g->write_prio = ca->set->prio_clock[WRITE].hand;
825
826                         verify_not_on_freelist(ca, g - ca->buckets);
827                         BUG_ON(!fifo_push(&ca->free_inc, g - ca->buckets));
828
829                         spin_unlock(&ca->freelist_lock);
830
831                         if (fifo_full(&ca->free_inc))
832                                 break;
833                 }
834         }
835 }
836
837 /**
838  * bch_allocator_thread - move buckets from free_inc to reserves
839  *
840  * The free_inc FIFO is populated by invalidate_buckets(), and
841  * the reserves are depleted by bucket allocation. When we run out
842  * of free_inc, try to invalidate some buckets and write out
843  * prios and gens.
844  */
845 static int bch_allocator_thread(void *arg)
846 {
847         struct cache *ca = arg;
848         struct cache_set *c = ca->set;
849         int ret;
850
851         set_freezable();
852
853         while (1) {
854                 /*
855                  * First, we pull buckets off of the free_inc list, possibly
856                  * issue discards to them, then we add the bucket to a
857                  * free list:
858                  */
859
860                 while (!fifo_empty(&ca->free_inc)) {
861                         long bucket = fifo_peek(&ca->free_inc);
862
863                         /*
864                          * Don't remove from free_inc until after it's added
865                          * to freelist, so gc doesn't miss it while we've
866                          * dropped bucket lock
867                          */
868
869                         if (ca->mi.discard &&
870                             blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
871                                 blkdev_issue_discard(ca->disk_sb.bdev,
872                                         bucket_to_sector(ca, bucket),
873                                         ca->mi.bucket_size, GFP_NOIO, 0);
874
875                         while (1) {
876                                 set_current_state(TASK_INTERRUPTIBLE);
877                                 if (bch_allocator_push(ca, bucket))
878                                         break;
879
880                                 if (kthread_should_stop()) {
881                                         __set_current_state(TASK_RUNNING);
882                                         goto out;
883                                 }
884                                 schedule();
885                                 try_to_freeze();
886                         }
887
888                         __set_current_state(TASK_RUNNING);
889                 }
890
891                 down_read(&c->gc_lock);
892
893                 /*
894                  * See if we have buckets we can reuse without invalidating them
895                  * or forcing a journal commit:
896                  */
897                 bch_find_empty_buckets(c, ca);
898
899                 if (fifo_used(&ca->free_inc) * 2 > ca->free_inc.size) {
900                         up_read(&c->gc_lock);
901                         continue;
902                 }
903
904                 /* We've run out of free buckets! */
905
906                 while (!fifo_full(&ca->free_inc)) {
907                         if (wait_buckets_available(ca)) {
908                                 up_read(&c->gc_lock);
909                                 goto out;
910                         }
911
912                         /*
913                          * Find some buckets that we can invalidate, either
914                          * they're completely unused, or only contain clean data
915                          * that's been written back to the backing device or
916                          * another cache tier
917                          */
918
919                         invalidate_buckets(ca);
920                         trace_bcache_alloc_batch(ca, fifo_used(&ca->free_inc),
921                                                  ca->free_inc.size);
922                 }
923
924                 up_read(&c->gc_lock);
925
926                 /*
927                  * free_inc is full of newly-invalidated buckets, must write out
928                  * prios and gens before they can be re-used
929                  */
930                 ret = bch_prio_write(ca);
931                 if (ret) {
932                         /*
933                          * Emergency read only - allocator thread has to
934                          * shutdown.
935                          *
936                          * N.B. we better be going into RO mode, else
937                          * allocations would hang indefinitely - whatever
938                          * generated the error will have sent us into RO mode.
939                          *
940                          * Clear out the free_inc freelist so things are
941                          * consistent-ish:
942                          */
943                         spin_lock(&ca->freelist_lock);
944                         while (!fifo_empty(&ca->free_inc)) {
945                                 long bucket;
946
947                                 fifo_pop(&ca->free_inc, bucket);
948                                 bch_mark_free_bucket(ca, ca->buckets + bucket);
949                         }
950                         spin_unlock(&ca->freelist_lock);
951                         goto out;
952                 }
953         }
954 out:
955         /*
956          * Avoid a race with bucket_stats_update() trying to wake us up after
957          * we've exited:
958          */
959         synchronize_rcu();
960         return 0;
961 }
962
963 /* Allocation */
964
965 /**
966  * bch_bucket_alloc - allocate a single bucket from a specific device
967  *
968  * Returns index of bucket on success, 0 on failure
969  * */
970 static size_t bch_bucket_alloc(struct cache *ca, enum alloc_reserve reserve)
971 {
972         struct bucket *g;
973         long r;
974
975         spin_lock(&ca->freelist_lock);
976         if (fifo_pop(&ca->free[RESERVE_NONE], r) ||
977             fifo_pop(&ca->free[reserve], r))
978                 goto out;
979
980         spin_unlock(&ca->freelist_lock);
981
982         trace_bcache_bucket_alloc_fail(ca, reserve);
983         return 0;
984 out:
985         verify_not_on_freelist(ca, r);
986         spin_unlock(&ca->freelist_lock);
987
988         trace_bcache_bucket_alloc(ca, reserve);
989
990         bch_wake_allocator(ca);
991
992         g = ca->buckets + r;
993
994         g->read_prio = ca->set->prio_clock[READ].hand;
995         g->write_prio = ca->set->prio_clock[WRITE].hand;
996
997         return r;
998 }
999
1000 static void __bch_bucket_free(struct cache *ca, struct bucket *g)
1001 {
1002         bch_mark_free_bucket(ca, g);
1003
1004         g->read_prio = ca->set->prio_clock[READ].hand;
1005         g->write_prio = ca->set->prio_clock[WRITE].hand;
1006 }
1007
1008 enum bucket_alloc_ret {
1009         ALLOC_SUCCESS,
1010         NO_DEVICES,             /* -EROFS */
1011         FREELIST_EMPTY,         /* Allocator thread not keeping up */
1012 };
1013
1014 static void recalc_alloc_group_weights(struct cache_set *c,
1015                                        struct cache_group *devs)
1016 {
1017         struct cache *ca;
1018         u64 available_buckets = 1; /* avoid a divide by zero... */
1019         unsigned i;
1020
1021         for (i = 0; i < devs->nr_devices; i++) {
1022                 ca = devs->d[i].dev;
1023
1024                 devs->d[i].weight = buckets_free_cache(ca);
1025                 available_buckets += devs->d[i].weight;
1026         }
1027
1028         for (i = 0; i < devs->nr_devices; i++) {
1029                 const unsigned min_weight = U32_MAX >> 4;
1030                 const unsigned max_weight = U32_MAX;
1031
1032                 devs->d[i].weight =
1033                         min_weight +
1034                         div64_u64(devs->d[i].weight *
1035                                   devs->nr_devices *
1036                                   (max_weight - min_weight),
1037                                   available_buckets);
1038                 devs->d[i].weight = min_t(u64, devs->d[i].weight, max_weight);
1039         }
1040 }
1041
1042 static enum bucket_alloc_ret bch_bucket_alloc_group(struct cache_set *c,
1043                                                     struct open_bucket *ob,
1044                                                     enum alloc_reserve reserve,
1045                                                     unsigned nr_replicas,
1046                                                     struct cache_group *devs,
1047                                                     long *caches_used)
1048 {
1049         enum bucket_alloc_ret ret;
1050         unsigned fail_idx = -1, i;
1051         unsigned available = 0;
1052
1053         BUG_ON(nr_replicas > ARRAY_SIZE(ob->ptrs));
1054
1055         if (ob->nr_ptrs >= nr_replicas)
1056                 return ALLOC_SUCCESS;
1057
1058         rcu_read_lock();
1059         spin_lock(&devs->lock);
1060
1061         for (i = 0; i < devs->nr_devices; i++)
1062                 available += !test_bit(devs->d[i].dev->dev_idx,
1063                                        caches_used);
1064
1065         recalc_alloc_group_weights(c, devs);
1066
1067         i = devs->cur_device;
1068
1069         while (ob->nr_ptrs < nr_replicas) {
1070                 struct cache *ca;
1071                 u64 bucket;
1072
1073                 if (!available) {
1074                         ret = NO_DEVICES;
1075                         goto err;
1076                 }
1077
1078                 i++;
1079                 i %= devs->nr_devices;
1080
1081                 ret = FREELIST_EMPTY;
1082                 if (i == fail_idx)
1083                         goto err;
1084
1085                 ca = devs->d[i].dev;
1086
1087                 if (test_bit(ca->dev_idx, caches_used))
1088                         continue;
1089
1090                 if (fail_idx == -1 &&
1091                     get_random_int() > devs->d[i].weight)
1092                         continue;
1093
1094                 bucket = bch_bucket_alloc(ca, reserve);
1095                 if (!bucket) {
1096                         if (fail_idx == -1)
1097                                 fail_idx = i;
1098                         continue;
1099                 }
1100
1101                 /*
1102                  * open_bucket_add_buckets expects new pointers at the head of
1103                  * the list:
1104                  */
1105                 memmove(&ob->ptrs[1],
1106                         &ob->ptrs[0],
1107                         ob->nr_ptrs * sizeof(ob->ptrs[0]));
1108                 memmove(&ob->ptr_offset[1],
1109                         &ob->ptr_offset[0],
1110                         ob->nr_ptrs * sizeof(ob->ptr_offset[0]));
1111                 ob->nr_ptrs++;
1112                 ob->ptrs[0] = (struct bch_extent_ptr) {
1113                         .gen    = ca->buckets[bucket].mark.gen,
1114                         .offset = bucket_to_sector(ca, bucket),
1115                         .dev    = ca->dev_idx,
1116                 };
1117                 ob->ptr_offset[0] = 0;
1118
1119                 __set_bit(ca->dev_idx, caches_used);
1120                 available--;
1121                 devs->cur_device = i;
1122         }
1123
1124         ret = ALLOC_SUCCESS;
1125 err:
1126         EBUG_ON(ret != ALLOC_SUCCESS && reserve == RESERVE_MOVINGGC);
1127         spin_unlock(&devs->lock);
1128         rcu_read_unlock();
1129         return ret;
1130 }
1131
1132 static enum bucket_alloc_ret __bch_bucket_alloc_set(struct cache_set *c,
1133                                                     struct write_point *wp,
1134                                                     struct open_bucket *ob,
1135                                                     unsigned nr_replicas,
1136                                                     enum alloc_reserve reserve,
1137                                                     long *caches_used)
1138 {
1139         /*
1140          * this should implement policy - for a given type of allocation, decide
1141          * which devices to allocate from:
1142          *
1143          * XXX: switch off wp->type and do something more intelligent here
1144          */
1145
1146         /* foreground writes: prefer tier 0: */
1147         if (wp->group == &c->cache_all)
1148                 bch_bucket_alloc_group(c, ob, reserve, nr_replicas,
1149                                        &c->cache_tiers[0], caches_used);
1150
1151         return bch_bucket_alloc_group(c, ob, reserve, nr_replicas,
1152                                       wp->group, caches_used);
1153 }
1154
1155 static int bch_bucket_alloc_set(struct cache_set *c, struct write_point *wp,
1156                                 struct open_bucket *ob, unsigned nr_replicas,
1157                                 enum alloc_reserve reserve, long *caches_used,
1158                                 struct closure *cl)
1159 {
1160         bool waiting = false;
1161
1162         while (1) {
1163                 switch (__bch_bucket_alloc_set(c, wp, ob, nr_replicas,
1164                                                reserve, caches_used)) {
1165                 case ALLOC_SUCCESS:
1166                         if (waiting)
1167                                 closure_wake_up(&c->freelist_wait);
1168
1169                         return 0;
1170
1171                 case NO_DEVICES:
1172                         if (waiting)
1173                                 closure_wake_up(&c->freelist_wait);
1174                         return -EROFS;
1175
1176                 case FREELIST_EMPTY:
1177                         if (!cl || waiting)
1178                                 trace_bcache_freelist_empty_fail(c,
1179                                                         reserve, cl);
1180
1181                         if (!cl)
1182                                 return -ENOSPC;
1183
1184                         if (waiting)
1185                                 return -EAGAIN;
1186
1187                         /* Retry allocation after adding ourself to waitlist: */
1188                         closure_wait(&c->freelist_wait, cl);
1189                         waiting = true;
1190                         break;
1191                 default:
1192                         BUG();
1193                 }
1194         }
1195 }
1196
1197 /* Open buckets: */
1198
1199 /*
1200  * Open buckets represent one or more buckets (on multiple devices) that are
1201  * currently being allocated from. They serve two purposes:
1202  *
1203  *  - They track buckets that have been partially allocated, allowing for
1204  *    sub-bucket sized allocations - they're used by the sector allocator below
1205  *
1206  *  - They provide a reference to the buckets they own that mark and sweep GC
1207  *    can find, until the new allocation has a pointer to it inserted into the
1208  *    btree
1209  *
1210  * When allocating some space with the sector allocator, the allocation comes
1211  * with a reference to an open bucket - the caller is required to put that
1212  * reference _after_ doing the index update that makes its allocation reachable.
1213  */
1214
1215 static void __bch_open_bucket_put(struct cache_set *c, struct open_bucket *ob)
1216 {
1217         const struct bch_extent_ptr *ptr;
1218         struct cache *ca;
1219
1220         lockdep_assert_held(&c->open_buckets_lock);
1221
1222         rcu_read_lock();
1223         open_bucket_for_each_online_device(c, ob, ptr, ca)
1224                 bch_mark_alloc_bucket(ca, PTR_BUCKET(ca, ptr), false);
1225         rcu_read_unlock();
1226
1227         ob->nr_ptrs = 0;
1228
1229         list_move(&ob->list, &c->open_buckets_free);
1230         c->open_buckets_nr_free++;
1231         closure_wake_up(&c->open_buckets_wait);
1232 }
1233
1234 void bch_open_bucket_put(struct cache_set *c, struct open_bucket *b)
1235 {
1236         if (atomic_dec_and_test(&b->pin)) {
1237                 spin_lock(&c->open_buckets_lock);
1238                 __bch_open_bucket_put(c, b);
1239                 spin_unlock(&c->open_buckets_lock);
1240         }
1241 }
1242
1243 static struct open_bucket *bch_open_bucket_get(struct cache_set *c,
1244                                                unsigned nr_reserved,
1245                                                struct closure *cl)
1246 {
1247         struct open_bucket *ret;
1248
1249         spin_lock(&c->open_buckets_lock);
1250
1251         if (c->open_buckets_nr_free > nr_reserved) {
1252                 BUG_ON(list_empty(&c->open_buckets_free));
1253                 ret = list_first_entry(&c->open_buckets_free,
1254                                        struct open_bucket, list);
1255                 list_move(&ret->list, &c->open_buckets_open);
1256                 BUG_ON(ret->nr_ptrs);
1257
1258                 atomic_set(&ret->pin, 1); /* XXX */
1259                 ret->has_full_ptrs      = false;
1260
1261                 c->open_buckets_nr_free--;
1262                 trace_bcache_open_bucket_alloc(c, cl);
1263         } else {
1264                 trace_bcache_open_bucket_alloc_fail(c, cl);
1265
1266                 if (cl) {
1267                         closure_wait(&c->open_buckets_wait, cl);
1268                         ret = ERR_PTR(-EAGAIN);
1269                 } else
1270                         ret = ERR_PTR(-ENOSPC);
1271         }
1272
1273         spin_unlock(&c->open_buckets_lock);
1274
1275         return ret;
1276 }
1277
1278 static unsigned ob_ptr_sectors_free(struct open_bucket *ob,
1279                                     struct cache_member_rcu *mi,
1280                                     struct bch_extent_ptr *ptr)
1281 {
1282         unsigned i = ptr - ob->ptrs;
1283         unsigned bucket_size = mi->m[ptr->dev].bucket_size;
1284         unsigned used = (ptr->offset & (bucket_size - 1)) +
1285                 ob->ptr_offset[i];
1286
1287         BUG_ON(used > bucket_size);
1288
1289         return bucket_size - used;
1290 }
1291
1292 static unsigned open_bucket_sectors_free(struct cache_set *c,
1293                                          struct open_bucket *ob,
1294                                          unsigned nr_replicas)
1295 {
1296         struct cache_member_rcu *mi = cache_member_info_get(c);
1297         unsigned i, sectors_free = UINT_MAX;
1298
1299         BUG_ON(nr_replicas > ob->nr_ptrs);
1300
1301         for (i = 0; i < nr_replicas; i++)
1302                 sectors_free = min(sectors_free,
1303                                    ob_ptr_sectors_free(ob, mi, &ob->ptrs[i]));
1304
1305         cache_member_info_put();
1306
1307         return sectors_free != UINT_MAX ? sectors_free : 0;
1308 }
1309
1310 static void open_bucket_copy_unused_ptrs(struct cache_set *c,
1311                                          struct open_bucket *new,
1312                                          struct open_bucket *old)
1313 {
1314         struct cache_member_rcu *mi = cache_member_info_get(c);
1315         unsigned i;
1316
1317         for (i = 0; i < old->nr_ptrs; i++)
1318                 if (ob_ptr_sectors_free(old, mi, &old->ptrs[i])) {
1319                         struct bch_extent_ptr tmp = old->ptrs[i];
1320
1321                         tmp.offset += old->ptr_offset[i];
1322                         new->ptrs[new->nr_ptrs] = tmp;
1323                         new->ptr_offset[new->nr_ptrs] = 0;
1324                         new->nr_ptrs++;
1325                 }
1326         cache_member_info_put();
1327 }
1328
1329 static void verify_not_stale(struct cache_set *c, const struct open_bucket *ob)
1330 {
1331 #ifdef CONFIG_BCACHE_DEBUG
1332         const struct bch_extent_ptr *ptr;
1333         struct cache *ca;
1334
1335         rcu_read_lock();
1336         open_bucket_for_each_online_device(c, ob, ptr, ca)
1337                 BUG_ON(ptr_stale(ca, ptr));
1338         rcu_read_unlock();
1339 #endif
1340 }
1341
1342 /* Sector allocator */
1343
1344 static struct open_bucket *lock_writepoint(struct cache_set *c,
1345                                            struct write_point *wp)
1346 {
1347         struct open_bucket *ob;
1348
1349         while ((ob = ACCESS_ONCE(wp->b))) {
1350                 mutex_lock(&ob->lock);
1351                 if (wp->b == ob)
1352                         break;
1353
1354                 mutex_unlock(&ob->lock);
1355         }
1356
1357         return ob;
1358 }
1359
1360 static int open_bucket_add_buckets(struct cache_set *c,
1361                                    struct write_point *wp,
1362                                    struct open_bucket *ob,
1363                                    unsigned nr_replicas,
1364                                    enum alloc_reserve reserve,
1365                                    struct closure *cl)
1366 {
1367         long caches_used[BITS_TO_LONGS(BCH_SB_MEMBERS_MAX)];
1368         int i, dst;
1369
1370         /*
1371          * We might be allocating pointers to add to an existing extent
1372          * (tiering/copygc/migration) - if so, some of the pointers in our
1373          * existing open bucket might duplicate devices we already have. This is
1374          * moderately annoying.
1375          */
1376
1377         /* Short circuit all the fun stuff if posssible: */
1378         if (ob->nr_ptrs >= nr_replicas)
1379                 return 0;
1380
1381         memset(caches_used, 0, sizeof(caches_used));
1382
1383         /*
1384          * Shuffle pointers to devices we already have to the end:
1385          * bch_bucket_alloc_set() will add new pointers to the statr of @b, and
1386          * bch_alloc_sectors_done() will add the first nr_replicas ptrs to @e:
1387          */
1388         for (i = dst = ob->nr_ptrs - 1; i >= 0; --i)
1389                 if (__test_and_set_bit(ob->ptrs[i].dev, caches_used)) {
1390                         if (i != dst) {
1391                                 swap(ob->ptrs[i], ob->ptrs[dst]);
1392                                 swap(ob->ptr_offset[i], ob->ptr_offset[dst]);
1393                         }
1394                         --dst;
1395                         nr_replicas++;
1396                 }
1397
1398         return bch_bucket_alloc_set(c, wp, ob, nr_replicas,
1399                                     reserve, caches_used, cl);
1400 }
1401
1402 /*
1403  * Get us an open_bucket we can allocate from, return with it locked:
1404  */
1405 struct open_bucket *bch_alloc_sectors_start(struct cache_set *c,
1406                                             struct write_point *wp,
1407                                             unsigned nr_replicas,
1408                                             enum alloc_reserve reserve,
1409                                             struct closure *cl)
1410 {
1411         struct open_bucket *ob;
1412         unsigned open_buckets_reserved = wp == &c->btree_write_point
1413                 ? 0 : BTREE_NODE_RESERVE;
1414         int ret;
1415
1416         BUG_ON(!wp->group);
1417         BUG_ON(!reserve);
1418         BUG_ON(!nr_replicas);
1419 retry:
1420         ob = lock_writepoint(c, wp);
1421
1422         /*
1423          * If ob->sectors_free == 0, one or more of the buckets ob points to is
1424          * full. We can't drop pointers from an open bucket - garbage collection
1425          * still needs to find them; instead, we must allocate a new open bucket
1426          * and copy any pointers to non-full buckets into the new open bucket.
1427          */
1428         if (!ob || ob->has_full_ptrs) {
1429                 struct open_bucket *new_ob;
1430
1431                 new_ob = bch_open_bucket_get(c, open_buckets_reserved, cl);
1432                 if (IS_ERR(new_ob))
1433                         return new_ob;
1434
1435                 mutex_lock(&new_ob->lock);
1436
1437                 /*
1438                  * We point the write point at the open_bucket before doing the
1439                  * allocation to avoid a race with shutdown:
1440                  */
1441                 if (race_fault() ||
1442                     cmpxchg(&wp->b, ob, new_ob) != ob) {
1443                         /* We raced: */
1444                         mutex_unlock(&new_ob->lock);
1445                         bch_open_bucket_put(c, new_ob);
1446
1447                         if (ob)
1448                                 mutex_unlock(&ob->lock);
1449                         goto retry;
1450                 }
1451
1452                 if (ob) {
1453                         open_bucket_copy_unused_ptrs(c, new_ob, ob);
1454                         mutex_unlock(&ob->lock);
1455                         bch_open_bucket_put(c, ob);
1456                 }
1457
1458                 ob = new_ob;
1459         }
1460
1461         ret = open_bucket_add_buckets(c, wp, ob, nr_replicas,
1462                                       reserve, cl);
1463         if (ret) {
1464                 mutex_unlock(&ob->lock);
1465                 return ERR_PTR(ret);
1466         }
1467
1468         ob->sectors_free = open_bucket_sectors_free(c, ob, nr_replicas);
1469
1470         BUG_ON(!ob->sectors_free);
1471         verify_not_stale(c, ob);
1472
1473         return ob;
1474 }
1475
1476 /*
1477  * Append pointers to the space we just allocated to @k, and mark @sectors space
1478  * as allocated out of @ob
1479  */
1480 void bch_alloc_sectors_append_ptrs(struct cache_set *c, struct bkey_i_extent *e,
1481                                    unsigned nr_replicas, struct open_bucket *ob,
1482                                    unsigned sectors)
1483 {
1484         struct bch_extent_ptr tmp, *ptr;
1485         struct cache *ca;
1486         bool has_data = false;
1487         unsigned i;
1488
1489         /*
1490          * We're keeping any existing pointer k has, and appending new pointers:
1491          * __bch_write() will only write to the pointers we add here:
1492          */
1493
1494         /*
1495          * XXX: don't add pointers to devices @e already has
1496          */
1497         BUG_ON(nr_replicas > ob->nr_ptrs);
1498         BUG_ON(sectors > ob->sectors_free);
1499
1500         /* didn't use all the ptrs: */
1501         if (nr_replicas < ob->nr_ptrs)
1502                 has_data = true;
1503
1504         for (i = 0; i < nr_replicas; i++) {
1505                 EBUG_ON(bch_extent_has_device(extent_i_to_s_c(e), ob->ptrs[i].dev));
1506
1507                 tmp = ob->ptrs[i];
1508                 tmp.cached = bkey_extent_is_cached(&e->k);
1509                 tmp.offset += ob->ptr_offset[i];
1510                 extent_ptr_append(e, tmp);
1511
1512                 ob->ptr_offset[i] += sectors;
1513         }
1514
1515         open_bucket_for_each_online_device(c, ob, ptr, ca)
1516                 this_cpu_add(*ca->sectors_written, sectors);
1517 }
1518
1519 /*
1520  * Append pointers to the space we just allocated to @k, and mark @sectors space
1521  * as allocated out of @ob
1522  */
1523 void bch_alloc_sectors_done(struct cache_set *c, struct write_point *wp,
1524                             struct open_bucket *ob)
1525 {
1526         struct cache_member_rcu *mi = cache_member_info_get(c);
1527         bool has_data = false;
1528         unsigned i;
1529
1530         for (i = 0; i < ob->nr_ptrs; i++) {
1531                 if (!ob_ptr_sectors_free(ob, mi, &ob->ptrs[i]))
1532                         ob->has_full_ptrs = true;
1533                 else
1534                         has_data = true;
1535         }
1536
1537         cache_member_info_put();
1538
1539         if (likely(has_data))
1540                 atomic_inc(&ob->pin);
1541         else
1542                 BUG_ON(xchg(&wp->b, NULL) != ob);
1543
1544         mutex_unlock(&ob->lock);
1545 }
1546
1547 /*
1548  * Allocates some space in the cache to write to, and k to point to the newly
1549  * allocated space, and updates k->size and k->offset (to point to the
1550  * end of the newly allocated space).
1551  *
1552  * May allocate fewer sectors than @sectors, k->size indicates how many
1553  * sectors were actually allocated.
1554  *
1555  * Return codes:
1556  * - -EAGAIN: closure was added to waitlist
1557  * - -ENOSPC: out of space and no closure provided
1558  *
1559  * @c  - cache set.
1560  * @wp - write point to use for allocating sectors.
1561  * @k  - key to return the allocated space information.
1562  * @cl - closure to wait for a bucket
1563  */
1564 struct open_bucket *bch_alloc_sectors(struct cache_set *c,
1565                                       struct write_point *wp,
1566                                       struct bkey_i_extent *e,
1567                                       unsigned nr_replicas,
1568                                       enum alloc_reserve reserve,
1569                                       struct closure *cl)
1570 {
1571         struct open_bucket *ob;
1572
1573         ob = bch_alloc_sectors_start(c, wp, nr_replicas, reserve, cl);
1574         if (IS_ERR_OR_NULL(ob))
1575                 return ob;
1576
1577         if (e->k.size > ob->sectors_free)
1578                 bch_key_resize(&e->k, ob->sectors_free);
1579
1580         bch_alloc_sectors_append_ptrs(c, e, nr_replicas, ob, e->k.size);
1581
1582         bch_alloc_sectors_done(c, wp, ob);
1583
1584         return ob;
1585 }
1586
1587 /* Startup/shutdown (ro/rw): */
1588
1589 static void bch_recalc_capacity(struct cache_set *c)
1590 {
1591         struct cache_group *tier = c->cache_tiers + ARRAY_SIZE(c->cache_tiers);
1592         struct cache *ca;
1593         u64 total_capacity, capacity = 0, reserved_sectors = 0;
1594         unsigned long ra_pages = 0;
1595         unsigned i, j;
1596
1597         rcu_read_lock();
1598         for_each_cache_rcu(ca, c, i) {
1599                 struct backing_dev_info *bdi =
1600                         blk_get_backing_dev_info(ca->disk_sb.bdev);
1601
1602                 ra_pages += bdi->ra_pages;
1603         }
1604
1605         c->bdi.ra_pages = ra_pages;
1606
1607         /*
1608          * Capacity of the cache set is the capacity of all the devices in the
1609          * slowest (highest) tier - we don't include lower tier devices.
1610          */
1611         for (tier = c->cache_tiers + ARRAY_SIZE(c->cache_tiers) - 1;
1612              tier > c->cache_tiers && !tier->nr_devices;
1613              --tier)
1614                 ;
1615
1616         group_for_each_cache_rcu(ca, tier, i) {
1617                 size_t reserve = 0;
1618
1619                 /*
1620                  * We need to reserve buckets (from the number
1621                  * of currently available buckets) against
1622                  * foreground writes so that mainly copygc can
1623                  * make forward progress.
1624                  *
1625                  * We need enough to refill the various reserves
1626                  * from scratch - copygc will use its entire
1627                  * reserve all at once, then run against when
1628                  * its reserve is refilled (from the formerly
1629                  * available buckets).
1630                  *
1631                  * This reserve is just used when considering if
1632                  * allocations for foreground writes must wait -
1633                  * not -ENOSPC calculations.
1634                  */
1635                 for (j = 0; j < RESERVE_NONE; j++)
1636                         reserve += ca->free[j].size;
1637
1638                 reserve += ca->free_inc.size;
1639
1640                 reserve += ARRAY_SIZE(c->write_points);
1641
1642                 if (ca->mi.tier)
1643                         reserve += 1;   /* tiering write point */
1644                 reserve += 1;           /* btree write point */
1645
1646                 reserved_sectors += reserve << ca->bucket_bits;
1647
1648                 capacity += (ca->mi.nbuckets -
1649                              ca->mi.first_bucket) <<
1650                         ca->bucket_bits;
1651         }
1652         rcu_read_unlock();
1653
1654         total_capacity = capacity;
1655
1656         capacity *= (100 - c->opts.gc_reserve_percent);
1657         capacity = div64_u64(capacity, 100);
1658
1659         BUG_ON(capacity + reserved_sectors > total_capacity);
1660
1661         c->capacity = capacity;
1662
1663         if (c->capacity) {
1664                 bch_io_timer_add(&c->io_clock[READ],
1665                                  &c->prio_clock[READ].rescale);
1666                 bch_io_timer_add(&c->io_clock[WRITE],
1667                                  &c->prio_clock[WRITE].rescale);
1668         } else {
1669                 bch_io_timer_del(&c->io_clock[READ],
1670                                  &c->prio_clock[READ].rescale);
1671                 bch_io_timer_del(&c->io_clock[WRITE],
1672                                  &c->prio_clock[WRITE].rescale);
1673         }
1674
1675         /* Wake up case someone was waiting for buckets */
1676         closure_wake_up(&c->freelist_wait);
1677 }
1678
1679 static void bch_stop_write_point(struct cache *ca,
1680                                  struct write_point *wp)
1681 {
1682         struct cache_set *c = ca->set;
1683         struct open_bucket *ob;
1684         struct bch_extent_ptr *ptr;
1685
1686         ob = lock_writepoint(c, wp);
1687         if (!ob)
1688                 return;
1689
1690         for (ptr = ob->ptrs; ptr < ob->ptrs + ob->nr_ptrs; ptr++)
1691                 if (ptr->dev == ca->dev_idx)
1692                         goto found;
1693
1694         mutex_unlock(&ob->lock);
1695         return;
1696 found:
1697         BUG_ON(xchg(&wp->b, NULL) != ob);
1698         mutex_unlock(&ob->lock);
1699
1700         /* Drop writepoint's ref: */
1701         bch_open_bucket_put(c, ob);
1702 }
1703
1704 static bool bch_dev_has_open_write_point(struct cache *ca)
1705 {
1706         struct cache_set *c = ca->set;
1707         struct bch_extent_ptr *ptr;
1708         struct open_bucket *ob;
1709
1710         for (ob = c->open_buckets;
1711              ob < c->open_buckets + ARRAY_SIZE(c->open_buckets);
1712              ob++)
1713                 if (atomic_read(&ob->pin)) {
1714                         mutex_lock(&ob->lock);
1715                         for (ptr = ob->ptrs; ptr < ob->ptrs + ob->nr_ptrs; ptr++)
1716                                 if (ptr->dev == ca->dev_idx) {
1717                                         mutex_unlock(&ob->lock);
1718                                         return true;
1719                                 }
1720                         mutex_unlock(&ob->lock);
1721                 }
1722
1723         return false;
1724 }
1725
1726 /* device goes ro: */
1727 void bch_dev_allocator_stop(struct cache *ca)
1728 {
1729         struct cache_set *c = ca->set;
1730         struct cache_group *tier = &c->cache_tiers[ca->mi.tier];
1731         struct task_struct *p;
1732         struct closure cl;
1733         unsigned i;
1734
1735         closure_init_stack(&cl);
1736
1737         /* First, remove device from allocation groups: */
1738
1739         bch_dev_group_remove(tier, ca);
1740         bch_dev_group_remove(&c->cache_all, ca);
1741
1742         bch_recalc_capacity(c);
1743
1744         /*
1745          * Stopping the allocator thread comes after removing from allocation
1746          * groups, else pending allocations will hang:
1747          */
1748
1749         p = ca->alloc_thread;
1750         ca->alloc_thread = NULL;
1751         smp_wmb();
1752
1753         /*
1754          * We need an rcu barrier between setting ca->alloc_thread = NULL and
1755          * the thread shutting down to avoid a race with bucket_stats_update() -
1756          * the allocator thread itself does a synchronize_rcu() on exit.
1757          *
1758          * XXX: it would be better to have the rcu barrier be asynchronous
1759          * instead of blocking us here
1760          */
1761         if (p) {
1762                 kthread_stop(p);
1763                 put_task_struct(p);
1764         }
1765
1766         /* Next, close write points that point to this device... */
1767
1768         for (i = 0; i < ARRAY_SIZE(c->write_points); i++)
1769                 bch_stop_write_point(ca, &c->write_points[i]);
1770
1771         bch_stop_write_point(ca, &ca->copygc_write_point);
1772         bch_stop_write_point(ca, &c->promote_write_point);
1773         bch_stop_write_point(ca, &ca->tiering_write_point);
1774         bch_stop_write_point(ca, &c->migration_write_point);
1775         bch_stop_write_point(ca, &c->btree_write_point);
1776
1777         mutex_lock(&c->btree_reserve_cache_lock);
1778         while (c->btree_reserve_cache_nr) {
1779                 struct btree_alloc *a =
1780                         &c->btree_reserve_cache[--c->btree_reserve_cache_nr];
1781
1782                 bch_open_bucket_put(c, a->ob);
1783         }
1784         mutex_unlock(&c->btree_reserve_cache_lock);
1785
1786         /* Avoid deadlocks.. */
1787
1788         closure_wake_up(&c->freelist_wait);
1789         wake_up(&c->journal.wait);
1790
1791         /* Now wait for any in flight writes: */
1792
1793         while (1) {
1794                 closure_wait(&c->open_buckets_wait, &cl);
1795
1796                 if (!bch_dev_has_open_write_point(ca)) {
1797                         closure_wake_up(&c->open_buckets_wait);
1798                         break;
1799                 }
1800
1801                 closure_sync(&cl);
1802         }
1803 }
1804
1805 /*
1806  * Startup the allocator thread for transition to RW mode:
1807  */
1808 int bch_dev_allocator_start(struct cache *ca)
1809 {
1810         struct cache_set *c = ca->set;
1811         struct cache_group *tier = &c->cache_tiers[ca->mi.tier];
1812         struct task_struct *k;
1813
1814         /*
1815          * allocator thread already started?
1816          */
1817         if (ca->alloc_thread)
1818                 return 0;
1819
1820         k = kthread_create(bch_allocator_thread, ca, "bcache_allocator");
1821         if (IS_ERR(k))
1822                 return 0;
1823
1824         get_task_struct(k);
1825         ca->alloc_thread = k;
1826
1827         bch_dev_group_add(tier, ca);
1828         bch_dev_group_add(&c->cache_all, ca);
1829
1830         bch_recalc_capacity(c);
1831
1832         /*
1833          * Don't wake up allocator thread until after adding device to
1834          * allocator groups - otherwise, alloc thread could get a spurious
1835          * -EROFS due to prio_write() -> journal_meta() not finding any devices:
1836          */
1837         wake_up_process(k);
1838         return 0;
1839 }
1840
1841 void bch_open_buckets_init(struct cache_set *c)
1842 {
1843         unsigned i;
1844
1845         INIT_LIST_HEAD(&c->open_buckets_open);
1846         INIT_LIST_HEAD(&c->open_buckets_free);
1847         spin_lock_init(&c->open_buckets_lock);
1848         bch_prio_timer_init(c, READ);
1849         bch_prio_timer_init(c, WRITE);
1850
1851         /* open bucket 0 is a sentinal NULL: */
1852         mutex_init(&c->open_buckets[0].lock);
1853         INIT_LIST_HEAD(&c->open_buckets[0].list);
1854
1855         for (i = 1; i < ARRAY_SIZE(c->open_buckets); i++) {
1856                 mutex_init(&c->open_buckets[i].lock);
1857                 c->open_buckets_nr_free++;
1858                 list_add(&c->open_buckets[i].list, &c->open_buckets_free);
1859         }
1860
1861         spin_lock_init(&c->cache_all.lock);
1862
1863         for (i = 0; i < ARRAY_SIZE(c->write_points); i++) {
1864                 c->write_points[i].throttle = true;
1865                 c->write_points[i].group = &c->cache_tiers[0];
1866         }
1867
1868         for (i = 0; i < ARRAY_SIZE(c->cache_tiers); i++)
1869                 spin_lock_init(&c->cache_tiers[i].lock);
1870
1871         c->promote_write_point.group = &c->cache_tiers[0];
1872
1873         c->migration_write_point.group = &c->cache_all;
1874
1875         c->btree_write_point.group = &c->cache_all;
1876
1877         c->pd_controllers_update_seconds = 5;
1878         INIT_DELAYED_WORK(&c->pd_controllers_update, pd_controllers_update);
1879
1880         spin_lock_init(&c->foreground_write_pd_lock);
1881         bch_pd_controller_init(&c->foreground_write_pd);
1882         /*
1883          * We do not want the write rate to have an effect on the computed
1884          * rate, for two reasons:
1885          *
1886          * We do not call bch_ratelimit_delay() at all if the write rate
1887          * exceeds 1GB/s. In this case, the PD controller will think we are
1888          * not "keeping up" and not change the rate.
1889          */
1890         c->foreground_write_pd.backpressure = 0;
1891         init_timer(&c->foreground_write_wakeup);
1892
1893         c->foreground_write_wakeup.data = (unsigned long) c;
1894         c->foreground_write_wakeup.function = bch_wake_delayed_writes;
1895 }