2 * Primary bucket allocation code
4 * Copyright 2012 Google, Inc.
6 * Allocation in bcache is done in terms of buckets:
8 * Each bucket has associated an 8 bit gen; this gen corresponds to the gen in
9 * btree pointers - they must match for the pointer to be considered valid.
11 * Thus (assuming a bucket has no dirty data or metadata in it) we can reuse a
12 * bucket simply by incrementing its gen.
14 * The gens (along with the priorities; it's really the gens are important but
15 * the code is named as if it's the priorities) are written in an arbitrary list
16 * of buckets on disk, with a pointer to them in the journal header.
18 * When we invalidate a bucket, we have to write its new gen to disk and wait
19 * for that write to complete before we use it - otherwise after a crash we
20 * could have pointers that appeared to be good but pointed to data that had
23 * Since the gens and priorities are all stored contiguously on disk, we can
24 * batch this up: We fill up the free_inc list with freshly invalidated buckets,
25 * call prio_write(), and when prio_write() finishes we pull buckets off the
26 * free_inc list and optionally discard them.
28 * free_inc isn't the only freelist - if it was, we'd often have to sleep while
29 * priorities and gens were being written before we could allocate. c->free is a
30 * smaller freelist, and buckets on that list are always ready to be used.
32 * If we've got discards enabled, that happens when a bucket moves from the
33 * free_inc list to the free list.
35 * It's important to ensure that gens don't wrap around - with respect to
36 * either the oldest gen in the btree or the gen on disk. This is quite
37 * difficult to do in practice, but we explicitly guard against it anyways - if
38 * a bucket is in danger of wrapping around we simply skip invalidating it that
39 * time around, and we garbage collect or rewrite the priorities sooner than we
40 * would have otherwise.
42 * bch_bucket_alloc() allocates a single bucket from a specific cache.
44 * bch_bucket_alloc_set() allocates one or more buckets from different caches
47 * invalidate_buckets() drives all the processes described above. It's called
48 * from bch_bucket_alloc() and a few other places that need to make sure free
51 * invalidate_buckets_(lru|fifo)() find buckets that are available to be
52 * invalidated, and then invalidate them and stick them on the free_inc list -
53 * in either lru or fifo order.
58 #include "btree_update.h"
69 #include <linux/blkdev.h>
70 #include <linux/kthread.h>
71 #include <linux/math64.h>
72 #include <linux/random.h>
73 #include <linux/rcupdate.h>
74 #include <trace/events/bcache.h>
76 static size_t bch_bucket_alloc(struct cache *, enum alloc_reserve);
77 static void __bch_bucket_free(struct cache *, struct bucket *);
79 /* Allocation groups: */
81 void bch_cache_group_remove_cache(struct cache_group *grp, struct cache *ca)
85 spin_lock(&grp->lock);
87 for (i = 0; i < grp->nr_devices; i++)
88 if (rcu_access_pointer(grp->d[i].dev) == ca) {
92 (grp->nr_devices - i) * sizeof(grp->d[0]));
96 spin_unlock(&grp->lock);
99 void bch_cache_group_add_cache(struct cache_group *grp, struct cache *ca)
103 spin_lock(&grp->lock);
104 for (i = 0; i < grp->nr_devices; i++)
105 if (rcu_access_pointer(grp->d[i].dev) == ca)
108 BUG_ON(grp->nr_devices >= BCH_SB_MEMBERS_MAX);
110 rcu_assign_pointer(grp->d[grp->nr_devices++].dev, ca);
112 spin_unlock(&grp->lock);
115 /* Ratelimiting/PD controllers */
117 static void pd_controllers_update(struct work_struct *work)
119 struct cache_set *c = container_of(to_delayed_work(work),
121 pd_controllers_update);
126 /* All units are in bytes */
127 u64 tier_size[BCH_TIER_MAX];
128 u64 tier_free[BCH_TIER_MAX];
129 u64 tier_dirty[BCH_TIER_MAX];
130 u64 tier0_can_free = 0;
132 memset(tier_size, 0, sizeof(tier_size));
133 memset(tier_free, 0, sizeof(tier_free));
134 memset(tier_dirty, 0, sizeof(tier_dirty));
137 for (i = BCH_TIER_MAX - 1; i >= 0; --i)
138 group_for_each_cache_rcu(ca, &c->cache_tiers[i], iter) {
139 struct bucket_stats_cache stats = bch_bucket_stats_read_cache(ca);
140 unsigned bucket_bits = ca->bucket_bits + 9;
143 * Bytes of internal fragmentation, which can be
144 * reclaimed by copy GC
146 s64 fragmented = ((stats.buckets_dirty +
147 stats.buckets_cached) <<
149 ((stats.sectors_dirty +
150 stats.sectors_cached) << 9);
152 u64 dev_size = (ca->mi.nbuckets -
153 ca->mi.first_bucket) << bucket_bits;
155 u64 free = __buckets_free_cache(ca, stats) << bucket_bits;
160 bch_pd_controller_update(&ca->moving_gc_pd,
161 free, fragmented, -1);
164 tier0_can_free += fragmented;
166 tier_size[i] += dev_size;
167 tier_free[i] += free;
168 tier_dirty[i] += stats.buckets_dirty << bucket_bits;
173 u64 target = div_u64(tier_size[0] * c->tiering_percent, 100);
175 tier0_can_free = max_t(s64, 0, tier_dirty[0] - target);
177 bch_pd_controller_update(&c->tiering_pd,
184 * Throttle foreground writes if tier 0 is running out of free buckets,
185 * and either tiering or copygc can free up space (but don't take both
188 * Target will be small if there isn't any work to do - we don't want to
189 * throttle foreground writes if we currently have all the free space
190 * we're ever going to have.
192 * Otherwise, if there's work to do, try to keep 20% of tier0 available
193 * for foreground writes.
195 bch_pd_controller_update(&c->foreground_write_pd,
197 div_u64(tier_size[0] *
198 c->foreground_target_percent,
203 schedule_delayed_work(&c->pd_controllers_update,
204 c->pd_controllers_update_seconds * HZ);
208 * Bucket priorities/gens:
210 * For each bucket, we store on disk its
214 * See alloc.c for an explanation of the gen. The priority is used to implement
215 * lru (and in the future other) cache replacement policies; for most purposes
216 * it's just an opaque integer.
218 * The gens and the priorities don't have a whole lot to do with each other, and
219 * it's actually the gens that must be written out at specific times - it's no
220 * big deal if the priorities don't get written, if we lose them we just reuse
221 * buckets in suboptimal order.
223 * On disk they're stored in a packed array, and in as many buckets are required
224 * to fit them all. The buckets we use to store them form a list; the journal
225 * header points to the first bucket, the first bucket points to the second
228 * This code is used by the allocation code; periodically (whenever it runs out
229 * of buckets to allocate from) the allocation code will invalidate some
230 * buckets, but it can't use those buckets until their new gens are safely on
234 static int prio_io(struct cache *ca, uint64_t bucket, int op)
236 bio_init(ca->bio_prio);
237 bio_set_op_attrs(ca->bio_prio, op, REQ_SYNC|REQ_META);
239 ca->bio_prio->bi_max_vecs = bucket_pages(ca);
240 ca->bio_prio->bi_io_vec = ca->bio_prio->bi_inline_vecs;
241 ca->bio_prio->bi_iter.bi_sector = bucket * ca->mi.bucket_size;
242 ca->bio_prio->bi_bdev = ca->disk_sb.bdev;
243 ca->bio_prio->bi_iter.bi_size = bucket_bytes(ca);
244 bch_bio_map(ca->bio_prio, ca->disk_buckets);
246 return submit_bio_wait(ca->bio_prio);
249 static struct nonce prio_nonce(struct prio_set *p)
251 return (struct nonce) {{
255 [3] = p->nonce[2]^BCH_NONCE_PRIO,
259 static int bch_prio_write(struct cache *ca)
261 struct cache_set *c = ca->set;
262 struct journal *j = &c->journal;
263 struct journal_res res = { 0 };
264 bool need_new_journal_entry;
267 if (c->opts.nochanges)
270 trace_bcache_prio_write_start(ca);
272 atomic64_add(ca->mi.bucket_size * prio_buckets(ca),
273 &ca->meta_sectors_written);
275 for (i = prio_buckets(ca) - 1; i >= 0; --i) {
277 struct prio_set *p = ca->disk_buckets;
278 struct bucket_disk *d = p->data;
279 struct bucket_disk *end = d + prios_per_bucket(ca);
282 for (r = i * prios_per_bucket(ca);
283 r < ca->mi.nbuckets && d < end;
286 d->read_prio = cpu_to_le16(g->read_prio);
287 d->write_prio = cpu_to_le16(g->write_prio);
288 d->gen = ca->buckets[r].mark.gen;
291 p->next_bucket = cpu_to_le64(ca->prio_buckets[i + 1]);
292 p->magic = cpu_to_le64(pset_magic(c));
293 get_random_bytes(&p->nonce, sizeof(p->nonce));
295 spin_lock(&ca->prio_buckets_lock);
296 r = bch_bucket_alloc(ca, RESERVE_PRIO);
300 * goes here before dropping prio_buckets_lock to guard against
301 * it getting gc'd from under us
303 ca->prio_buckets[i] = r;
304 bch_mark_metadata_bucket(ca, ca->buckets + r, false);
305 spin_unlock(&ca->prio_buckets_lock);
307 SET_PSET_CSUM_TYPE(p, bch_meta_checksum_type(c));
309 bch_encrypt(c, PSET_CSUM_TYPE(p),
313 offsetof(struct prio_set, encrypted_start));
315 p->csum = bch_checksum(c, PSET_CSUM_TYPE(p),
317 (void *) p + sizeof(p->csum),
318 bucket_bytes(ca) - sizeof(p->csum));
320 ret = prio_io(ca, r, REQ_OP_WRITE);
321 if (cache_fatal_io_err_on(ret, ca,
322 "prio write to bucket %zu", r) ||
323 bch_meta_write_fault("prio"))
328 j->prio_buckets[ca->dev_idx] = cpu_to_le64(ca->prio_buckets[0]);
329 j->nr_prio_buckets = max_t(unsigned,
332 spin_unlock(&j->lock);
335 unsigned u64s = jset_u64s(0);
337 ret = bch_journal_res_get(j, &res, u64s, u64s);
341 need_new_journal_entry = j->buf[res.idx].nr_prio_buckets <
343 bch_journal_res_put(j, &res);
345 ret = bch_journal_flush_seq(j, res.seq);
348 } while (need_new_journal_entry);
351 * Don't want the old priorities to get garbage collected until after we
352 * finish writing the new ones, and they're journalled
355 spin_lock(&ca->prio_buckets_lock);
357 for (i = 0; i < prio_buckets(ca); i++) {
358 if (ca->prio_last_buckets[i])
359 __bch_bucket_free(ca,
360 &ca->buckets[ca->prio_last_buckets[i]]);
362 ca->prio_last_buckets[i] = ca->prio_buckets[i];
365 spin_unlock(&ca->prio_buckets_lock);
367 trace_bcache_prio_write_end(ca);
371 int bch_prio_read(struct cache *ca)
373 struct cache_set *c = ca->set;
374 struct prio_set *p = ca->disk_buckets;
375 struct bucket_disk *d = p->data + prios_per_bucket(ca), *end = d;
376 struct bucket_mark new;
377 struct bch_csum csum;
378 unsigned bucket_nr = 0;
379 u64 bucket, expect, got;
383 spin_lock(&c->journal.lock);
384 bucket = le64_to_cpu(c->journal.prio_buckets[ca->dev_idx]);
385 spin_unlock(&c->journal.lock);
388 * If the device hasn't been used yet, there won't be a prio bucket ptr
393 unfixable_fsck_err_on(bucket < ca->mi.first_bucket ||
394 bucket >= ca->mi.nbuckets, c,
395 "bad prio bucket %llu", bucket);
397 for (b = 0; b < ca->mi.nbuckets; b++, d++) {
399 ca->prio_last_buckets[bucket_nr] = bucket;
402 ret = prio_io(ca, bucket, REQ_OP_READ);
403 if (cache_fatal_io_err_on(ret, ca,
404 "prior read from bucket %llu",
406 bch_meta_read_fault("prio"))
409 got = le64_to_cpu(p->magic);
410 expect = pset_magic(c);
411 unfixable_fsck_err_on(got != expect, c,
412 "bad magic (got %llu expect %llu) while reading prios from bucket %llu",
413 got, expect, bucket);
415 unfixable_fsck_err_on(PSET_CSUM_TYPE(p) >= BCH_CSUM_NR, c,
416 "prio bucket with unknown csum type %llu bucket %lluu",
417 PSET_CSUM_TYPE(p), bucket);
419 csum = bch_checksum(c, PSET_CSUM_TYPE(p),
421 (void *) p + sizeof(p->csum),
422 bucket_bytes(ca) - sizeof(p->csum));
423 unfixable_fsck_err_on(bch_crc_cmp(csum, p->csum), c,
424 "bad checksum reading prios from bucket %llu",
427 bch_encrypt(c, PSET_CSUM_TYPE(p),
431 offsetof(struct prio_set, encrypted_start));
433 bucket = le64_to_cpu(p->next_bucket);
437 ca->buckets[b].read_prio = le16_to_cpu(d->read_prio);
438 ca->buckets[b].write_prio = le16_to_cpu(d->write_prio);
440 bucket_cmpxchg(&ca->buckets[b], new, new.gen = d->gen);
446 #define BUCKET_GC_GEN_MAX 96U
449 * wait_buckets_available - wait on reclaimable buckets
451 * If there aren't enough available buckets to fill up free_inc, wait until
454 static int wait_buckets_available(struct cache *ca)
456 struct cache_set *c = ca->set;
460 set_current_state(TASK_INTERRUPTIBLE);
461 if (kthread_should_stop()) {
466 if (ca->inc_gen_needs_gc >= fifo_free(&ca->free_inc)) {
468 trace_bcache_gc_cannot_inc_gens(ca->set);
469 atomic_inc(&c->kick_gc);
470 wake_up_process(ca->set->gc_thread);
474 * We are going to wait for GC to wake us up, even if
475 * bucket counters tell us enough buckets are available,
476 * because we are actually waiting for GC to rewrite
477 * nodes with stale pointers
479 } else if (buckets_available_cache(ca) >=
480 fifo_free(&ca->free_inc))
483 up_read(&ca->set->gc_lock);
486 down_read(&ca->set->gc_lock);
489 __set_current_state(TASK_RUNNING);
493 static void verify_not_on_freelist(struct cache *ca, size_t bucket)
495 if (expensive_debug_checks(ca->set)) {
500 for (iter = 0; iter < prio_buckets(ca) * 2; iter++)
501 BUG_ON(ca->prio_buckets[iter] == bucket);
503 for (j = 0; j < RESERVE_NR; j++)
504 fifo_for_each_entry(i, &ca->free[j], iter)
506 fifo_for_each_entry(i, &ca->free_inc, iter)
511 /* Bucket heap / gen */
513 void bch_recalc_min_prio(struct cache *ca, int rw)
515 struct cache_set *c = ca->set;
516 struct prio_clock *clock = &c->prio_clock[rw];
521 /* Determine min prio for this particular cache */
522 for_each_bucket(g, ca)
523 max_delta = max(max_delta, (u16) (clock->hand - g->prio[rw]));
525 ca->min_prio[rw] = clock->hand - max_delta;
528 * This may possibly increase the min prio for the whole cache, check
533 for_each_cache(ca, c, i)
534 max_delta = max(max_delta,
535 (u16) (clock->hand - ca->min_prio[rw]));
537 clock->min_prio = clock->hand - max_delta;
540 static void bch_rescale_prios(struct cache_set *c, int rw)
542 struct prio_clock *clock = &c->prio_clock[rw];
547 trace_bcache_rescale_prios(c);
549 for_each_cache(ca, c, i) {
550 for_each_bucket(g, ca)
551 g->prio[rw] = clock->hand -
552 (clock->hand - g->prio[rw]) / 2;
554 bch_recalc_min_prio(ca, rw);
558 static void bch_inc_clock_hand(struct io_timer *timer)
560 struct prio_clock *clock = container_of(timer,
561 struct prio_clock, rescale);
562 struct cache_set *c = container_of(clock,
563 struct cache_set, prio_clock[clock->rw]);
566 mutex_lock(&c->bucket_lock);
570 /* if clock cannot be advanced more, rescale prio */
571 if (clock->hand == (u16) (clock->min_prio - 1))
572 bch_rescale_prios(c, clock->rw);
574 mutex_unlock(&c->bucket_lock);
576 capacity = READ_ONCE(c->capacity);
582 * we only increment when 0.1% of the cache_set has been read
583 * or written too, this determines if it's time
585 * XXX: we shouldn't really be going off of the capacity of devices in
586 * RW mode (that will be 0 when we're RO, yet we can still service
589 timer->expire += capacity >> 10;
591 bch_io_timer_add(&c->io_clock[clock->rw], timer);
594 static void bch_prio_timer_init(struct cache_set *c, int rw)
596 struct prio_clock *clock = &c->prio_clock[rw];
597 struct io_timer *timer = &clock->rescale;
600 timer->fn = bch_inc_clock_hand;
601 timer->expire = c->capacity >> 10;
605 * Background allocation thread: scans for buckets to be invalidated,
606 * invalidates them, rewrites prios/gens (marking them as invalidated on disk),
607 * then optionally issues discard commands to the newly free buckets, then puts
608 * them on the various freelists.
611 static inline bool can_inc_bucket_gen(struct cache *ca, struct bucket *g)
613 return bucket_gc_gen(ca, g) < BUCKET_GC_GEN_MAX;
616 static bool bch_can_invalidate_bucket(struct cache *ca, struct bucket *g)
618 if (!is_available_bucket(READ_ONCE(g->mark)))
621 if (bucket_gc_gen(ca, g) >= BUCKET_GC_GEN_MAX - 1)
622 ca->inc_gen_needs_gc++;
624 return can_inc_bucket_gen(ca, g);
627 static void bch_invalidate_one_bucket(struct cache *ca, struct bucket *g)
629 spin_lock(&ca->freelist_lock);
631 bch_invalidate_bucket(ca, g);
633 g->read_prio = ca->set->prio_clock[READ].hand;
634 g->write_prio = ca->set->prio_clock[WRITE].hand;
636 verify_not_on_freelist(ca, g - ca->buckets);
637 BUG_ON(!fifo_push(&ca->free_inc, g - ca->buckets));
639 spin_unlock(&ca->freelist_lock);
643 * Determines what order we're going to reuse buckets, smallest bucket_key()
647 * - We take into account the read prio of the bucket, which gives us an
648 * indication of how hot the data is -- we scale the prio so that the prio
649 * farthest from the clock is worth 1/8th of the closest.
651 * - The number of sectors of cached data in the bucket, which gives us an
652 * indication of the cost in cache misses this eviction will cause.
654 * - The difference between the bucket's current gen and oldest gen of any
655 * pointer into it, which gives us an indication of the cost of an eventual
656 * btree GC to rewrite nodes with stale pointers.
659 #define bucket_sort_key(g) \
661 unsigned long prio = g->read_prio - ca->min_prio[READ]; \
662 prio = (prio * 7) / (ca->set->prio_clock[READ].hand - \
663 ca->min_prio[READ]); \
665 (((prio + 1) * bucket_sectors_used(g)) << 8) | bucket_gc_gen(ca, g);\
668 static void invalidate_buckets_lru(struct cache *ca)
670 struct bucket_heap_entry e;
674 mutex_lock(&ca->heap_lock);
678 mutex_lock(&ca->set->bucket_lock);
679 bch_recalc_min_prio(ca, READ);
680 bch_recalc_min_prio(ca, WRITE);
683 * Find buckets with lowest read priority, by building a maxheap sorted
684 * by read priority and repeatedly replacing the maximum element until
685 * all buckets have been visited.
687 for_each_bucket(g, ca) {
688 if (!bch_can_invalidate_bucket(ca, g))
691 bucket_heap_push(ca, g, bucket_sort_key(g));
694 /* Sort buckets by physical location on disk for better locality */
695 for (i = 0; i < ca->heap.used; i++) {
696 struct bucket_heap_entry *e = &ca->heap.data[i];
698 e->val = e->g - ca->buckets;
701 heap_resort(&ca->heap, bucket_max_cmp);
704 * If we run out of buckets to invalidate, bch_allocator_thread() will
705 * kick stuff and retry us
707 while (!fifo_full(&ca->free_inc) &&
708 heap_pop(&ca->heap, e, bucket_max_cmp)) {
709 BUG_ON(!bch_can_invalidate_bucket(ca, e.g));
710 bch_invalidate_one_bucket(ca, e.g);
713 mutex_unlock(&ca->set->bucket_lock);
714 mutex_unlock(&ca->heap_lock);
717 static void invalidate_buckets_fifo(struct cache *ca)
722 while (!fifo_full(&ca->free_inc)) {
723 if (ca->fifo_last_bucket < ca->mi.first_bucket ||
724 ca->fifo_last_bucket >= ca->mi.nbuckets)
725 ca->fifo_last_bucket = ca->mi.first_bucket;
727 g = ca->buckets + ca->fifo_last_bucket++;
729 if (bch_can_invalidate_bucket(ca, g))
730 bch_invalidate_one_bucket(ca, g);
732 if (++checked >= ca->mi.nbuckets)
737 static void invalidate_buckets_random(struct cache *ca)
742 while (!fifo_full(&ca->free_inc)) {
743 size_t n = bch_rand_range(ca->mi.nbuckets -
744 ca->mi.first_bucket) +
749 if (bch_can_invalidate_bucket(ca, g))
750 bch_invalidate_one_bucket(ca, g);
752 if (++checked >= ca->mi.nbuckets / 2)
757 static void invalidate_buckets(struct cache *ca)
759 ca->inc_gen_needs_gc = 0;
761 switch (ca->mi.replacement) {
762 case CACHE_REPLACEMENT_LRU:
763 invalidate_buckets_lru(ca);
765 case CACHE_REPLACEMENT_FIFO:
766 invalidate_buckets_fifo(ca);
768 case CACHE_REPLACEMENT_RANDOM:
769 invalidate_buckets_random(ca);
774 static bool __bch_allocator_push(struct cache *ca, long bucket)
776 if (fifo_push(&ca->free[RESERVE_PRIO], bucket))
779 if (fifo_push(&ca->free[RESERVE_MOVINGGC], bucket))
782 if (fifo_push(&ca->free[RESERVE_BTREE], bucket))
785 if (fifo_push(&ca->free[RESERVE_NONE], bucket))
790 closure_wake_up(&ca->set->freelist_wait);
794 static bool bch_allocator_push(struct cache *ca, long bucket)
798 spin_lock(&ca->freelist_lock);
799 ret = __bch_allocator_push(ca, bucket);
801 fifo_pop(&ca->free_inc, bucket);
802 spin_unlock(&ca->freelist_lock);
807 static void bch_find_empty_buckets(struct cache_set *c, struct cache *ca)
809 u16 last_seq_ondisk = c->journal.last_seq_ondisk;
812 for_each_bucket(g, ca) {
813 struct bucket_mark m = READ_ONCE(g->mark);
815 if (is_available_bucket(m) &&
818 (!m.wait_on_journal ||
819 ((s16) last_seq_ondisk - (s16) m.journal_seq >= 0))) {
820 spin_lock(&ca->freelist_lock);
822 bch_mark_alloc_bucket(ca, g, true);
823 g->read_prio = ca->set->prio_clock[READ].hand;
824 g->write_prio = ca->set->prio_clock[WRITE].hand;
826 verify_not_on_freelist(ca, g - ca->buckets);
827 BUG_ON(!fifo_push(&ca->free_inc, g - ca->buckets));
829 spin_unlock(&ca->freelist_lock);
831 if (fifo_full(&ca->free_inc))
838 * bch_allocator_thread - move buckets from free_inc to reserves
840 * The free_inc FIFO is populated by invalidate_buckets(), and
841 * the reserves are depleted by bucket allocation. When we run out
842 * of free_inc, try to invalidate some buckets and write out
845 static int bch_allocator_thread(void *arg)
847 struct cache *ca = arg;
848 struct cache_set *c = ca->set;
855 * First, we pull buckets off of the free_inc list, possibly
856 * issue discards to them, then we add the bucket to a
860 while (!fifo_empty(&ca->free_inc)) {
861 long bucket = fifo_peek(&ca->free_inc);
864 * Don't remove from free_inc until after it's added
865 * to freelist, so gc doesn't miss it while we've
866 * dropped bucket lock
869 if (ca->mi.discard &&
870 blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
871 blkdev_issue_discard(ca->disk_sb.bdev,
872 bucket_to_sector(ca, bucket),
873 ca->mi.bucket_size, GFP_NOIO, 0);
876 set_current_state(TASK_INTERRUPTIBLE);
877 if (bch_allocator_push(ca, bucket))
880 if (kthread_should_stop()) {
881 __set_current_state(TASK_RUNNING);
888 __set_current_state(TASK_RUNNING);
891 down_read(&c->gc_lock);
894 * See if we have buckets we can reuse without invalidating them
895 * or forcing a journal commit:
897 bch_find_empty_buckets(c, ca);
899 if (fifo_used(&ca->free_inc) * 2 > ca->free_inc.size) {
900 up_read(&c->gc_lock);
904 /* We've run out of free buckets! */
906 while (!fifo_full(&ca->free_inc)) {
907 if (wait_buckets_available(ca)) {
908 up_read(&c->gc_lock);
913 * Find some buckets that we can invalidate, either
914 * they're completely unused, or only contain clean data
915 * that's been written back to the backing device or
919 invalidate_buckets(ca);
920 trace_bcache_alloc_batch(ca, fifo_used(&ca->free_inc),
924 up_read(&c->gc_lock);
927 * free_inc is full of newly-invalidated buckets, must write out
928 * prios and gens before they can be re-used
930 ret = bch_prio_write(ca);
933 * Emergency read only - allocator thread has to
936 * N.B. we better be going into RO mode, else
937 * allocations would hang indefinitely - whatever
938 * generated the error will have sent us into RO mode.
940 * Clear out the free_inc freelist so things are
943 spin_lock(&ca->freelist_lock);
944 while (!fifo_empty(&ca->free_inc)) {
947 fifo_pop(&ca->free_inc, bucket);
948 bch_mark_free_bucket(ca, ca->buckets + bucket);
950 spin_unlock(&ca->freelist_lock);
956 * Avoid a race with bucket_stats_update() trying to wake us up after
966 * bch_bucket_alloc - allocate a single bucket from a specific device
968 * Returns index of bucket on success, 0 on failure
970 static size_t bch_bucket_alloc(struct cache *ca, enum alloc_reserve reserve)
975 spin_lock(&ca->freelist_lock);
976 if (fifo_pop(&ca->free[RESERVE_NONE], r) ||
977 fifo_pop(&ca->free[reserve], r))
980 spin_unlock(&ca->freelist_lock);
982 trace_bcache_bucket_alloc_fail(ca, reserve);
985 verify_not_on_freelist(ca, r);
986 spin_unlock(&ca->freelist_lock);
988 trace_bcache_bucket_alloc(ca, reserve);
990 bch_wake_allocator(ca);
994 g->read_prio = ca->set->prio_clock[READ].hand;
995 g->write_prio = ca->set->prio_clock[WRITE].hand;
1000 static void __bch_bucket_free(struct cache *ca, struct bucket *g)
1002 bch_mark_free_bucket(ca, g);
1004 g->read_prio = ca->set->prio_clock[READ].hand;
1005 g->write_prio = ca->set->prio_clock[WRITE].hand;
1008 enum bucket_alloc_ret {
1010 NO_DEVICES, /* -EROFS */
1011 FREELIST_EMPTY, /* Allocator thread not keeping up */
1014 static void recalc_alloc_group_weights(struct cache_set *c,
1015 struct cache_group *devs)
1018 u64 available_buckets = 1; /* avoid a divide by zero... */
1021 for (i = 0; i < devs->nr_devices; i++) {
1022 ca = devs->d[i].dev;
1024 devs->d[i].weight = buckets_free_cache(ca);
1025 available_buckets += devs->d[i].weight;
1028 for (i = 0; i < devs->nr_devices; i++) {
1029 const unsigned min_weight = U32_MAX >> 4;
1030 const unsigned max_weight = U32_MAX;
1034 div64_u64(devs->d[i].weight *
1036 (max_weight - min_weight),
1038 devs->d[i].weight = min_t(u64, devs->d[i].weight, max_weight);
1042 static enum bucket_alloc_ret bch_bucket_alloc_group(struct cache_set *c,
1043 struct open_bucket *ob,
1044 enum alloc_reserve reserve,
1045 unsigned nr_replicas,
1046 struct cache_group *devs,
1049 enum bucket_alloc_ret ret;
1050 unsigned fail_idx = -1, i;
1051 unsigned available = 0;
1053 BUG_ON(nr_replicas > ARRAY_SIZE(ob->ptrs));
1055 if (ob->nr_ptrs >= nr_replicas)
1056 return ALLOC_SUCCESS;
1059 spin_lock(&devs->lock);
1061 for (i = 0; i < devs->nr_devices; i++)
1062 available += !test_bit(devs->d[i].dev->dev_idx,
1065 recalc_alloc_group_weights(c, devs);
1067 i = devs->cur_device;
1069 while (ob->nr_ptrs < nr_replicas) {
1079 i %= devs->nr_devices;
1081 ret = FREELIST_EMPTY;
1085 ca = devs->d[i].dev;
1087 if (test_bit(ca->dev_idx, caches_used))
1090 if (fail_idx == -1 &&
1091 get_random_int() > devs->d[i].weight)
1094 bucket = bch_bucket_alloc(ca, reserve);
1102 * open_bucket_add_buckets expects new pointers at the head of
1105 memmove(&ob->ptrs[1],
1107 ob->nr_ptrs * sizeof(ob->ptrs[0]));
1108 memmove(&ob->ptr_offset[1],
1110 ob->nr_ptrs * sizeof(ob->ptr_offset[0]));
1112 ob->ptrs[0] = (struct bch_extent_ptr) {
1113 .gen = ca->buckets[bucket].mark.gen,
1114 .offset = bucket_to_sector(ca, bucket),
1117 ob->ptr_offset[0] = 0;
1119 __set_bit(ca->dev_idx, caches_used);
1121 devs->cur_device = i;
1124 ret = ALLOC_SUCCESS;
1126 EBUG_ON(ret != ALLOC_SUCCESS && reserve == RESERVE_MOVINGGC);
1127 spin_unlock(&devs->lock);
1132 static enum bucket_alloc_ret __bch_bucket_alloc_set(struct cache_set *c,
1133 struct write_point *wp,
1134 struct open_bucket *ob,
1135 unsigned nr_replicas,
1136 enum alloc_reserve reserve,
1140 * this should implement policy - for a given type of allocation, decide
1141 * which devices to allocate from:
1143 * XXX: switch off wp->type and do something more intelligent here
1146 /* foreground writes: prefer tier 0: */
1147 if (wp->group == &c->cache_all)
1148 bch_bucket_alloc_group(c, ob, reserve, nr_replicas,
1149 &c->cache_tiers[0], caches_used);
1151 return bch_bucket_alloc_group(c, ob, reserve, nr_replicas,
1152 wp->group, caches_used);
1155 static int bch_bucket_alloc_set(struct cache_set *c, struct write_point *wp,
1156 struct open_bucket *ob, unsigned nr_replicas,
1157 enum alloc_reserve reserve, long *caches_used,
1160 bool waiting = false;
1163 switch (__bch_bucket_alloc_set(c, wp, ob, nr_replicas,
1164 reserve, caches_used)) {
1167 closure_wake_up(&c->freelist_wait);
1173 closure_wake_up(&c->freelist_wait);
1176 case FREELIST_EMPTY:
1178 trace_bcache_freelist_empty_fail(c,
1187 /* Retry allocation after adding ourself to waitlist: */
1188 closure_wait(&c->freelist_wait, cl);
1200 * Open buckets represent one or more buckets (on multiple devices) that are
1201 * currently being allocated from. They serve two purposes:
1203 * - They track buckets that have been partially allocated, allowing for
1204 * sub-bucket sized allocations - they're used by the sector allocator below
1206 * - They provide a reference to the buckets they own that mark and sweep GC
1207 * can find, until the new allocation has a pointer to it inserted into the
1210 * When allocating some space with the sector allocator, the allocation comes
1211 * with a reference to an open bucket - the caller is required to put that
1212 * reference _after_ doing the index update that makes its allocation reachable.
1215 static void __bch_open_bucket_put(struct cache_set *c, struct open_bucket *ob)
1217 const struct bch_extent_ptr *ptr;
1220 lockdep_assert_held(&c->open_buckets_lock);
1223 open_bucket_for_each_online_device(c, ob, ptr, ca)
1224 bch_mark_alloc_bucket(ca, PTR_BUCKET(ca, ptr), false);
1229 list_move(&ob->list, &c->open_buckets_free);
1230 c->open_buckets_nr_free++;
1231 closure_wake_up(&c->open_buckets_wait);
1234 void bch_open_bucket_put(struct cache_set *c, struct open_bucket *b)
1236 if (atomic_dec_and_test(&b->pin)) {
1237 spin_lock(&c->open_buckets_lock);
1238 __bch_open_bucket_put(c, b);
1239 spin_unlock(&c->open_buckets_lock);
1243 static struct open_bucket *bch_open_bucket_get(struct cache_set *c,
1244 unsigned nr_reserved,
1247 struct open_bucket *ret;
1249 spin_lock(&c->open_buckets_lock);
1251 if (c->open_buckets_nr_free > nr_reserved) {
1252 BUG_ON(list_empty(&c->open_buckets_free));
1253 ret = list_first_entry(&c->open_buckets_free,
1254 struct open_bucket, list);
1255 list_move(&ret->list, &c->open_buckets_open);
1256 BUG_ON(ret->nr_ptrs);
1258 atomic_set(&ret->pin, 1); /* XXX */
1259 ret->has_full_ptrs = false;
1261 c->open_buckets_nr_free--;
1262 trace_bcache_open_bucket_alloc(c, cl);
1264 trace_bcache_open_bucket_alloc_fail(c, cl);
1267 closure_wait(&c->open_buckets_wait, cl);
1268 ret = ERR_PTR(-EAGAIN);
1270 ret = ERR_PTR(-ENOSPC);
1273 spin_unlock(&c->open_buckets_lock);
1278 static unsigned ob_ptr_sectors_free(struct open_bucket *ob,
1279 struct cache_member_rcu *mi,
1280 struct bch_extent_ptr *ptr)
1282 unsigned i = ptr - ob->ptrs;
1283 unsigned bucket_size = mi->m[ptr->dev].bucket_size;
1284 unsigned used = (ptr->offset & (bucket_size - 1)) +
1287 BUG_ON(used > bucket_size);
1289 return bucket_size - used;
1292 static unsigned open_bucket_sectors_free(struct cache_set *c,
1293 struct open_bucket *ob,
1294 unsigned nr_replicas)
1296 struct cache_member_rcu *mi = cache_member_info_get(c);
1297 unsigned i, sectors_free = UINT_MAX;
1299 BUG_ON(nr_replicas > ob->nr_ptrs);
1301 for (i = 0; i < nr_replicas; i++)
1302 sectors_free = min(sectors_free,
1303 ob_ptr_sectors_free(ob, mi, &ob->ptrs[i]));
1305 cache_member_info_put();
1307 return sectors_free != UINT_MAX ? sectors_free : 0;
1310 static void open_bucket_copy_unused_ptrs(struct cache_set *c,
1311 struct open_bucket *new,
1312 struct open_bucket *old)
1314 struct cache_member_rcu *mi = cache_member_info_get(c);
1317 for (i = 0; i < old->nr_ptrs; i++)
1318 if (ob_ptr_sectors_free(old, mi, &old->ptrs[i])) {
1319 struct bch_extent_ptr tmp = old->ptrs[i];
1321 tmp.offset += old->ptr_offset[i];
1322 new->ptrs[new->nr_ptrs] = tmp;
1323 new->ptr_offset[new->nr_ptrs] = 0;
1326 cache_member_info_put();
1329 static void verify_not_stale(struct cache_set *c, const struct open_bucket *ob)
1331 #ifdef CONFIG_BCACHE_DEBUG
1332 const struct bch_extent_ptr *ptr;
1336 open_bucket_for_each_online_device(c, ob, ptr, ca)
1337 BUG_ON(ptr_stale(ca, ptr));
1342 /* Sector allocator */
1344 static struct open_bucket *lock_writepoint(struct cache_set *c,
1345 struct write_point *wp)
1347 struct open_bucket *ob;
1349 while ((ob = ACCESS_ONCE(wp->b))) {
1350 mutex_lock(&ob->lock);
1354 mutex_unlock(&ob->lock);
1360 static int open_bucket_add_buckets(struct cache_set *c,
1361 struct write_point *wp,
1362 struct open_bucket *ob,
1363 unsigned nr_replicas,
1364 enum alloc_reserve reserve,
1367 long caches_used[BITS_TO_LONGS(BCH_SB_MEMBERS_MAX)];
1371 * We might be allocating pointers to add to an existing extent
1372 * (tiering/copygc/migration) - if so, some of the pointers in our
1373 * existing open bucket might duplicate devices we already have. This is
1374 * moderately annoying.
1377 /* Short circuit all the fun stuff if posssible: */
1378 if (ob->nr_ptrs >= nr_replicas)
1381 memset(caches_used, 0, sizeof(caches_used));
1384 * Shuffle pointers to devices we already have to the end:
1385 * bch_bucket_alloc_set() will add new pointers to the statr of @b, and
1386 * bch_alloc_sectors_done() will add the first nr_replicas ptrs to @e:
1388 for (i = dst = ob->nr_ptrs - 1; i >= 0; --i)
1389 if (__test_and_set_bit(ob->ptrs[i].dev, caches_used)) {
1391 swap(ob->ptrs[i], ob->ptrs[dst]);
1392 swap(ob->ptr_offset[i], ob->ptr_offset[dst]);
1398 return bch_bucket_alloc_set(c, wp, ob, nr_replicas,
1399 reserve, caches_used, cl);
1403 * Get us an open_bucket we can allocate from, return with it locked:
1405 struct open_bucket *bch_alloc_sectors_start(struct cache_set *c,
1406 struct write_point *wp,
1407 unsigned nr_replicas,
1408 enum alloc_reserve reserve,
1411 struct open_bucket *ob;
1412 unsigned open_buckets_reserved = wp == &c->btree_write_point
1413 ? 0 : BTREE_NODE_RESERVE;
1418 BUG_ON(!nr_replicas);
1420 ob = lock_writepoint(c, wp);
1423 * If ob->sectors_free == 0, one or more of the buckets ob points to is
1424 * full. We can't drop pointers from an open bucket - garbage collection
1425 * still needs to find them; instead, we must allocate a new open bucket
1426 * and copy any pointers to non-full buckets into the new open bucket.
1428 if (!ob || ob->has_full_ptrs) {
1429 struct open_bucket *new_ob;
1431 new_ob = bch_open_bucket_get(c, open_buckets_reserved, cl);
1435 mutex_lock(&new_ob->lock);
1438 * We point the write point at the open_bucket before doing the
1439 * allocation to avoid a race with shutdown:
1442 cmpxchg(&wp->b, ob, new_ob) != ob) {
1444 mutex_unlock(&new_ob->lock);
1445 bch_open_bucket_put(c, new_ob);
1448 mutex_unlock(&ob->lock);
1453 open_bucket_copy_unused_ptrs(c, new_ob, ob);
1454 mutex_unlock(&ob->lock);
1455 bch_open_bucket_put(c, ob);
1461 ret = open_bucket_add_buckets(c, wp, ob, nr_replicas,
1464 mutex_unlock(&ob->lock);
1465 return ERR_PTR(ret);
1468 ob->sectors_free = open_bucket_sectors_free(c, ob, nr_replicas);
1470 BUG_ON(!ob->sectors_free);
1471 verify_not_stale(c, ob);
1477 * Append pointers to the space we just allocated to @k, and mark @sectors space
1478 * as allocated out of @ob
1480 void bch_alloc_sectors_append_ptrs(struct cache_set *c, struct bkey_i_extent *e,
1481 unsigned nr_replicas, struct open_bucket *ob,
1484 struct bch_extent_ptr tmp, *ptr;
1486 bool has_data = false;
1490 * We're keeping any existing pointer k has, and appending new pointers:
1491 * __bch_write() will only write to the pointers we add here:
1495 * XXX: don't add pointers to devices @e already has
1497 BUG_ON(nr_replicas > ob->nr_ptrs);
1498 BUG_ON(sectors > ob->sectors_free);
1500 /* didn't use all the ptrs: */
1501 if (nr_replicas < ob->nr_ptrs)
1504 for (i = 0; i < nr_replicas; i++) {
1505 EBUG_ON(bch_extent_has_device(extent_i_to_s_c(e), ob->ptrs[i].dev));
1508 tmp.cached = bkey_extent_is_cached(&e->k);
1509 tmp.offset += ob->ptr_offset[i];
1510 extent_ptr_append(e, tmp);
1512 ob->ptr_offset[i] += sectors;
1515 open_bucket_for_each_online_device(c, ob, ptr, ca)
1516 this_cpu_add(*ca->sectors_written, sectors);
1520 * Append pointers to the space we just allocated to @k, and mark @sectors space
1521 * as allocated out of @ob
1523 void bch_alloc_sectors_done(struct cache_set *c, struct write_point *wp,
1524 struct open_bucket *ob)
1526 struct cache_member_rcu *mi = cache_member_info_get(c);
1527 bool has_data = false;
1530 for (i = 0; i < ob->nr_ptrs; i++) {
1531 if (!ob_ptr_sectors_free(ob, mi, &ob->ptrs[i]))
1532 ob->has_full_ptrs = true;
1537 cache_member_info_put();
1539 if (likely(has_data))
1540 atomic_inc(&ob->pin);
1542 BUG_ON(xchg(&wp->b, NULL) != ob);
1544 mutex_unlock(&ob->lock);
1548 * Allocates some space in the cache to write to, and k to point to the newly
1549 * allocated space, and updates k->size and k->offset (to point to the
1550 * end of the newly allocated space).
1552 * May allocate fewer sectors than @sectors, k->size indicates how many
1553 * sectors were actually allocated.
1556 * - -EAGAIN: closure was added to waitlist
1557 * - -ENOSPC: out of space and no closure provided
1560 * @wp - write point to use for allocating sectors.
1561 * @k - key to return the allocated space information.
1562 * @cl - closure to wait for a bucket
1564 struct open_bucket *bch_alloc_sectors(struct cache_set *c,
1565 struct write_point *wp,
1566 struct bkey_i_extent *e,
1567 unsigned nr_replicas,
1568 enum alloc_reserve reserve,
1571 struct open_bucket *ob;
1573 ob = bch_alloc_sectors_start(c, wp, nr_replicas, reserve, cl);
1574 if (IS_ERR_OR_NULL(ob))
1577 if (e->k.size > ob->sectors_free)
1578 bch_key_resize(&e->k, ob->sectors_free);
1580 bch_alloc_sectors_append_ptrs(c, e, nr_replicas, ob, e->k.size);
1582 bch_alloc_sectors_done(c, wp, ob);
1587 /* Startup/shutdown (ro/rw): */
1589 static void bch_recalc_capacity(struct cache_set *c)
1591 struct cache_group *tier = c->cache_tiers + ARRAY_SIZE(c->cache_tiers);
1593 u64 total_capacity, capacity = 0, reserved_sectors = 0;
1594 unsigned long ra_pages = 0;
1598 for_each_cache_rcu(ca, c, i) {
1599 struct backing_dev_info *bdi =
1600 blk_get_backing_dev_info(ca->disk_sb.bdev);
1602 ra_pages += bdi->ra_pages;
1605 c->bdi.ra_pages = ra_pages;
1608 * Capacity of the cache set is the capacity of all the devices in the
1609 * slowest (highest) tier - we don't include lower tier devices.
1611 for (tier = c->cache_tiers + ARRAY_SIZE(c->cache_tiers) - 1;
1612 tier > c->cache_tiers && !tier->nr_devices;
1616 group_for_each_cache_rcu(ca, tier, i) {
1620 * We need to reserve buckets (from the number
1621 * of currently available buckets) against
1622 * foreground writes so that mainly copygc can
1623 * make forward progress.
1625 * We need enough to refill the various reserves
1626 * from scratch - copygc will use its entire
1627 * reserve all at once, then run against when
1628 * its reserve is refilled (from the formerly
1629 * available buckets).
1631 * This reserve is just used when considering if
1632 * allocations for foreground writes must wait -
1633 * not -ENOSPC calculations.
1635 for (j = 0; j < RESERVE_NONE; j++)
1636 reserve += ca->free[j].size;
1638 reserve += ca->free_inc.size;
1640 reserve += ARRAY_SIZE(c->write_points);
1643 reserve += 1; /* tiering write point */
1644 reserve += 1; /* btree write point */
1646 reserved_sectors += reserve << ca->bucket_bits;
1648 capacity += (ca->mi.nbuckets -
1649 ca->mi.first_bucket) <<
1654 total_capacity = capacity;
1656 capacity *= (100 - c->opts.gc_reserve_percent);
1657 capacity = div64_u64(capacity, 100);
1659 BUG_ON(capacity + reserved_sectors > total_capacity);
1661 c->capacity = capacity;
1664 bch_io_timer_add(&c->io_clock[READ],
1665 &c->prio_clock[READ].rescale);
1666 bch_io_timer_add(&c->io_clock[WRITE],
1667 &c->prio_clock[WRITE].rescale);
1669 bch_io_timer_del(&c->io_clock[READ],
1670 &c->prio_clock[READ].rescale);
1671 bch_io_timer_del(&c->io_clock[WRITE],
1672 &c->prio_clock[WRITE].rescale);
1675 /* Wake up case someone was waiting for buckets */
1676 closure_wake_up(&c->freelist_wait);
1679 static void bch_stop_write_point(struct cache *ca,
1680 struct write_point *wp)
1682 struct cache_set *c = ca->set;
1683 struct open_bucket *ob;
1684 struct bch_extent_ptr *ptr;
1686 ob = lock_writepoint(c, wp);
1690 for (ptr = ob->ptrs; ptr < ob->ptrs + ob->nr_ptrs; ptr++)
1691 if (ptr->dev == ca->dev_idx)
1694 mutex_unlock(&ob->lock);
1697 BUG_ON(xchg(&wp->b, NULL) != ob);
1698 mutex_unlock(&ob->lock);
1700 /* Drop writepoint's ref: */
1701 bch_open_bucket_put(c, ob);
1704 static bool bch_dev_has_open_write_point(struct cache *ca)
1706 struct cache_set *c = ca->set;
1707 struct bch_extent_ptr *ptr;
1708 struct open_bucket *ob;
1710 for (ob = c->open_buckets;
1711 ob < c->open_buckets + ARRAY_SIZE(c->open_buckets);
1713 if (atomic_read(&ob->pin)) {
1714 mutex_lock(&ob->lock);
1715 for (ptr = ob->ptrs; ptr < ob->ptrs + ob->nr_ptrs; ptr++)
1716 if (ptr->dev == ca->dev_idx) {
1717 mutex_unlock(&ob->lock);
1720 mutex_unlock(&ob->lock);
1726 /* device goes ro: */
1727 void bch_cache_allocator_stop(struct cache *ca)
1729 struct cache_set *c = ca->set;
1730 struct cache_group *tier = &c->cache_tiers[ca->mi.tier];
1731 struct task_struct *p;
1735 closure_init_stack(&cl);
1737 /* First, remove device from allocation groups: */
1739 bch_cache_group_remove_cache(tier, ca);
1740 bch_cache_group_remove_cache(&c->cache_all, ca);
1742 bch_recalc_capacity(c);
1745 * Stopping the allocator thread comes after removing from allocation
1746 * groups, else pending allocations will hang:
1749 p = ca->alloc_thread;
1750 ca->alloc_thread = NULL;
1754 * We need an rcu barrier between setting ca->alloc_thread = NULL and
1755 * the thread shutting down to avoid a race with bucket_stats_update() -
1756 * the allocator thread itself does a synchronize_rcu() on exit.
1758 * XXX: it would be better to have the rcu barrier be asynchronous
1759 * instead of blocking us here
1766 /* Next, close write points that point to this device... */
1768 for (i = 0; i < ARRAY_SIZE(c->write_points); i++)
1769 bch_stop_write_point(ca, &c->write_points[i]);
1771 bch_stop_write_point(ca, &ca->copygc_write_point);
1772 bch_stop_write_point(ca, &c->promote_write_point);
1773 bch_stop_write_point(ca, &ca->tiering_write_point);
1774 bch_stop_write_point(ca, &c->migration_write_point);
1775 bch_stop_write_point(ca, &c->btree_write_point);
1777 mutex_lock(&c->btree_reserve_cache_lock);
1778 while (c->btree_reserve_cache_nr) {
1779 struct btree_alloc *a =
1780 &c->btree_reserve_cache[--c->btree_reserve_cache_nr];
1782 bch_open_bucket_put(c, a->ob);
1784 mutex_unlock(&c->btree_reserve_cache_lock);
1786 /* Avoid deadlocks.. */
1788 closure_wake_up(&c->freelist_wait);
1789 wake_up(&c->journal.wait);
1791 /* Now wait for any in flight writes: */
1794 closure_wait(&c->open_buckets_wait, &cl);
1796 if (!bch_dev_has_open_write_point(ca)) {
1797 closure_wake_up(&c->open_buckets_wait);
1806 * Startup the allocator thread for transition to RW mode:
1808 int bch_cache_allocator_start(struct cache *ca)
1810 struct cache_set *c = ca->set;
1811 struct cache_group *tier = &c->cache_tiers[ca->mi.tier];
1812 struct task_struct *k;
1815 * allocator thread already started?
1817 if (ca->alloc_thread)
1820 k = kthread_create(bch_allocator_thread, ca, "bcache_allocator");
1825 ca->alloc_thread = k;
1827 bch_cache_group_add_cache(tier, ca);
1828 bch_cache_group_add_cache(&c->cache_all, ca);
1830 bch_recalc_capacity(c);
1833 * Don't wake up allocator thread until after adding device to
1834 * allocator groups - otherwise, alloc thread could get a spurious
1835 * -EROFS due to prio_write() -> journal_meta() not finding any devices:
1841 void bch_open_buckets_init(struct cache_set *c)
1845 INIT_LIST_HEAD(&c->open_buckets_open);
1846 INIT_LIST_HEAD(&c->open_buckets_free);
1847 spin_lock_init(&c->open_buckets_lock);
1848 bch_prio_timer_init(c, READ);
1849 bch_prio_timer_init(c, WRITE);
1851 /* open bucket 0 is a sentinal NULL: */
1852 mutex_init(&c->open_buckets[0].lock);
1853 INIT_LIST_HEAD(&c->open_buckets[0].list);
1855 for (i = 1; i < ARRAY_SIZE(c->open_buckets); i++) {
1856 mutex_init(&c->open_buckets[i].lock);
1857 c->open_buckets_nr_free++;
1858 list_add(&c->open_buckets[i].list, &c->open_buckets_free);
1861 spin_lock_init(&c->cache_all.lock);
1863 for (i = 0; i < ARRAY_SIZE(c->write_points); i++) {
1864 c->write_points[i].throttle = true;
1865 c->write_points[i].group = &c->cache_tiers[0];
1868 for (i = 0; i < ARRAY_SIZE(c->cache_tiers); i++)
1869 spin_lock_init(&c->cache_tiers[i].lock);
1871 c->promote_write_point.group = &c->cache_tiers[0];
1873 c->migration_write_point.group = &c->cache_all;
1875 c->btree_write_point.group = &c->cache_all;
1877 c->pd_controllers_update_seconds = 5;
1878 INIT_DELAYED_WORK(&c->pd_controllers_update, pd_controllers_update);
1880 spin_lock_init(&c->foreground_write_pd_lock);
1881 bch_pd_controller_init(&c->foreground_write_pd);
1883 * We do not want the write rate to have an effect on the computed
1884 * rate, for two reasons:
1886 * We do not call bch_ratelimit_delay() at all if the write rate
1887 * exceeds 1GB/s. In this case, the PD controller will think we are
1888 * not "keeping up" and not change the rate.
1890 c->foreground_write_pd.backpressure = 0;
1891 init_timer(&c->foreground_write_wakeup);
1893 c->foreground_write_wakeup.data = (unsigned long) c;
1894 c->foreground_write_wakeup.function = bch_wake_delayed_writes;