]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcache/alloc.c
bcache in userspace; userspace fsck
[bcachefs-tools-debian] / libbcache / alloc.c
1 /*
2  * Primary bucket allocation code
3  *
4  * Copyright 2012 Google, Inc.
5  *
6  * Allocation in bcache is done in terms of buckets:
7  *
8  * Each bucket has associated an 8 bit gen; this gen corresponds to the gen in
9  * btree pointers - they must match for the pointer to be considered valid.
10  *
11  * Thus (assuming a bucket has no dirty data or metadata in it) we can reuse a
12  * bucket simply by incrementing its gen.
13  *
14  * The gens (along with the priorities; it's really the gens are important but
15  * the code is named as if it's the priorities) are written in an arbitrary list
16  * of buckets on disk, with a pointer to them in the journal header.
17  *
18  * When we invalidate a bucket, we have to write its new gen to disk and wait
19  * for that write to complete before we use it - otherwise after a crash we
20  * could have pointers that appeared to be good but pointed to data that had
21  * been overwritten.
22  *
23  * Since the gens and priorities are all stored contiguously on disk, we can
24  * batch this up: We fill up the free_inc list with freshly invalidated buckets,
25  * call prio_write(), and when prio_write() finishes we pull buckets off the
26  * free_inc list and optionally discard them.
27  *
28  * free_inc isn't the only freelist - if it was, we'd often have to sleep while
29  * priorities and gens were being written before we could allocate. c->free is a
30  * smaller freelist, and buckets on that list are always ready to be used.
31  *
32  * If we've got discards enabled, that happens when a bucket moves from the
33  * free_inc list to the free list.
34  *
35  * It's important to ensure that gens don't wrap around - with respect to
36  * either the oldest gen in the btree or the gen on disk. This is quite
37  * difficult to do in practice, but we explicitly guard against it anyways - if
38  * a bucket is in danger of wrapping around we simply skip invalidating it that
39  * time around, and we garbage collect or rewrite the priorities sooner than we
40  * would have otherwise.
41  *
42  * bch_bucket_alloc() allocates a single bucket from a specific cache.
43  *
44  * bch_bucket_alloc_set() allocates one or more buckets from different caches
45  * out of a cache set.
46  *
47  * invalidate_buckets() drives all the processes described above. It's called
48  * from bch_bucket_alloc() and a few other places that need to make sure free
49  * buckets are ready.
50  *
51  * invalidate_buckets_(lru|fifo)() find buckets that are available to be
52  * invalidated, and then invalidate them and stick them on the free_inc list -
53  * in either lru or fifo order.
54  */
55
56 #include "bcache.h"
57 #include "alloc.h"
58 #include "btree_update.h"
59 #include "buckets.h"
60 #include "checksum.h"
61 #include "clock.h"
62 #include "debug.h"
63 #include "error.h"
64 #include "extents.h"
65 #include "io.h"
66 #include "journal.h"
67 #include "super.h"
68
69 #include <linux/blkdev.h>
70 #include <linux/kthread.h>
71 #include <linux/math64.h>
72 #include <linux/random.h>
73 #include <linux/rcupdate.h>
74 #include <trace/events/bcache.h>
75
76 static size_t bch_bucket_alloc(struct cache *, enum alloc_reserve);
77 static void __bch_bucket_free(struct cache *, struct bucket *);
78
79 /* Allocation groups: */
80
81 void bch_cache_group_remove_cache(struct cache_group *grp, struct cache *ca)
82 {
83         unsigned i;
84
85         spin_lock(&grp->lock);
86
87         for (i = 0; i < grp->nr_devices; i++)
88                 if (rcu_access_pointer(grp->d[i].dev) == ca) {
89                         grp->nr_devices--;
90                         memmove(&grp->d[i],
91                                 &grp->d[i + 1],
92                                 (grp->nr_devices - i) * sizeof(grp->d[0]));
93                         break;
94                 }
95
96         spin_unlock(&grp->lock);
97 }
98
99 void bch_cache_group_add_cache(struct cache_group *grp, struct cache *ca)
100 {
101         unsigned i;
102
103         spin_lock(&grp->lock);
104         for (i = 0; i < grp->nr_devices; i++)
105                 if (rcu_access_pointer(grp->d[i].dev) == ca)
106                         goto out;
107
108         BUG_ON(grp->nr_devices >= MAX_CACHES_PER_SET);
109
110         rcu_assign_pointer(grp->d[grp->nr_devices++].dev, ca);
111 out:
112         spin_unlock(&grp->lock);
113 }
114
115 /* Ratelimiting/PD controllers */
116
117 static void pd_controllers_update(struct work_struct *work)
118 {
119         struct cache_set *c = container_of(to_delayed_work(work),
120                                            struct cache_set,
121                                            pd_controllers_update);
122         struct cache *ca;
123         unsigned iter;
124         int i;
125
126         /* All units are in bytes */
127         u64 tier_size[CACHE_TIERS];
128         u64 tier_free[CACHE_TIERS];
129         u64 tier_dirty[CACHE_TIERS];
130         u64 tier0_can_free = 0;
131
132         memset(tier_size, 0, sizeof(tier_size));
133         memset(tier_free, 0, sizeof(tier_free));
134         memset(tier_dirty, 0, sizeof(tier_dirty));
135
136         rcu_read_lock();
137         for (i = CACHE_TIERS - 1; i >= 0; --i)
138                 group_for_each_cache_rcu(ca, &c->cache_tiers[i], iter) {
139                         struct bucket_stats_cache stats = bch_bucket_stats_read_cache(ca);
140                         unsigned bucket_bits = ca->bucket_bits + 9;
141
142                         /*
143                          * Bytes of internal fragmentation, which can be
144                          * reclaimed by copy GC
145                          */
146                         s64 fragmented = ((stats.buckets_dirty +
147                                            stats.buckets_cached) <<
148                                           bucket_bits) -
149                                 ((stats.sectors_dirty +
150                                   stats.sectors_cached) << 9);
151
152                         u64 dev_size = (ca->mi.nbuckets -
153                                         ca->mi.first_bucket) << bucket_bits;
154
155                         u64 free = __buckets_free_cache(ca, stats) << bucket_bits;
156
157                         if (fragmented < 0)
158                                 fragmented = 0;
159
160                         bch_pd_controller_update(&ca->moving_gc_pd,
161                                                  free, fragmented, -1);
162
163                         if (i == 0)
164                                 tier0_can_free += fragmented;
165
166                         tier_size[i] += dev_size;
167                         tier_free[i] += free;
168                         tier_dirty[i] += stats.buckets_dirty << bucket_bits;
169                 }
170         rcu_read_unlock();
171
172         if (tier_size[1]) {
173                 u64 target = div_u64(tier_size[0] * c->tiering_percent, 100);
174
175                 tier0_can_free = max_t(s64, 0, tier_dirty[0] - target);
176
177                 bch_pd_controller_update(&c->tiering_pd,
178                                          target,
179                                          tier_dirty[0],
180                                          -1);
181         }
182
183         /*
184          * Throttle foreground writes if tier 0 is running out of free buckets,
185          * and either tiering or copygc can free up space (but don't take both
186          * into account).
187          *
188          * Target will be small if there isn't any work to do - we don't want to
189          * throttle foreground writes if we currently have all the free space
190          * we're ever going to have.
191          *
192          * Otherwise, if there's work to do, try to keep 20% of tier0 available
193          * for foreground writes.
194          */
195         bch_pd_controller_update(&c->foreground_write_pd,
196                                  min(tier0_can_free,
197                                      div_u64(tier_size[0] *
198                                              c->foreground_target_percent,
199                                              100)),
200                                  tier_free[0],
201                                  -1);
202
203         schedule_delayed_work(&c->pd_controllers_update,
204                               c->pd_controllers_update_seconds * HZ);
205 }
206
207 /*
208  * Bucket priorities/gens:
209  *
210  * For each bucket, we store on disk its
211    * 8 bit gen
212    * 16 bit priority
213  *
214  * See alloc.c for an explanation of the gen. The priority is used to implement
215  * lru (and in the future other) cache replacement policies; for most purposes
216  * it's just an opaque integer.
217  *
218  * The gens and the priorities don't have a whole lot to do with each other, and
219  * it's actually the gens that must be written out at specific times - it's no
220  * big deal if the priorities don't get written, if we lose them we just reuse
221  * buckets in suboptimal order.
222  *
223  * On disk they're stored in a packed array, and in as many buckets are required
224  * to fit them all. The buckets we use to store them form a list; the journal
225  * header points to the first bucket, the first bucket points to the second
226  * bucket, et cetera.
227  *
228  * This code is used by the allocation code; periodically (whenever it runs out
229  * of buckets to allocate from) the allocation code will invalidate some
230  * buckets, but it can't use those buckets until their new gens are safely on
231  * disk.
232  */
233
234 static int prio_io(struct cache *ca, uint64_t bucket, int op)
235 {
236         bio_init(ca->bio_prio);
237         bio_set_op_attrs(ca->bio_prio, op, REQ_SYNC|REQ_META);
238
239         ca->bio_prio->bi_max_vecs       = bucket_pages(ca);
240         ca->bio_prio->bi_io_vec         = ca->bio_prio->bi_inline_vecs;
241         ca->bio_prio->bi_iter.bi_sector = bucket * ca->mi.bucket_size;
242         ca->bio_prio->bi_bdev           = ca->disk_sb.bdev;
243         ca->bio_prio->bi_iter.bi_size   = bucket_bytes(ca);
244         bch_bio_map(ca->bio_prio, ca->disk_buckets);
245
246         return submit_bio_wait(ca->bio_prio);
247 }
248
249 static int bch_prio_write(struct cache *ca)
250 {
251         struct cache_set *c = ca->set;
252         struct journal *j = &c->journal;
253         struct journal_res res = { 0 };
254         bool need_new_journal_entry;
255         int i, ret;
256
257         trace_bcache_prio_write_start(ca);
258
259         atomic64_add(ca->mi.bucket_size * prio_buckets(ca),
260                      &ca->meta_sectors_written);
261
262         for (i = prio_buckets(ca) - 1; i >= 0; --i) {
263                 struct bucket *g;
264                 struct prio_set *p = ca->disk_buckets;
265                 struct bucket_disk *d = p->data;
266                 struct bucket_disk *end = d + prios_per_bucket(ca);
267                 size_t r;
268
269                 for (r = i * prios_per_bucket(ca);
270                      r < ca->mi.nbuckets && d < end;
271                      r++, d++) {
272                         g = ca->buckets + r;
273                         d->read_prio = cpu_to_le16(g->read_prio);
274                         d->write_prio = cpu_to_le16(g->write_prio);
275                         d->gen = ca->buckets[r].mark.gen;
276                 }
277
278                 p->next_bucket  = cpu_to_le64(ca->prio_buckets[i + 1]);
279                 p->magic        = cpu_to_le64(pset_magic(&c->disk_sb));
280
281                 SET_PSET_CSUM_TYPE(p, c->opts.metadata_checksum);
282                 p->csum         = cpu_to_le64(bch_checksum(PSET_CSUM_TYPE(p),
283                                                            &p->magic,
284                                                            bucket_bytes(ca) - 8));
285
286                 spin_lock(&ca->prio_buckets_lock);
287                 r = bch_bucket_alloc(ca, RESERVE_PRIO);
288                 BUG_ON(!r);
289
290                 /*
291                  * goes here before dropping prio_buckets_lock to guard against
292                  * it getting gc'd from under us
293                  */
294                 ca->prio_buckets[i] = r;
295                 bch_mark_metadata_bucket(ca, ca->buckets + r, false);
296                 spin_unlock(&ca->prio_buckets_lock);
297
298                 ret = prio_io(ca, r, REQ_OP_WRITE);
299                 if (cache_fatal_io_err_on(ret, ca,
300                                           "prio write to bucket %zu", r) ||
301                     bch_meta_write_fault("prio"))
302                         return ret;
303         }
304
305         spin_lock(&j->lock);
306         j->prio_buckets[ca->sb.nr_this_dev] = cpu_to_le64(ca->prio_buckets[0]);
307         j->nr_prio_buckets = max_t(unsigned,
308                                    ca->sb.nr_this_dev + 1,
309                                    j->nr_prio_buckets);
310         spin_unlock(&j->lock);
311
312         do {
313                 unsigned u64s = jset_u64s(0);
314
315                 ret = bch_journal_res_get(j, &res, u64s, u64s);
316                 if (ret)
317                         return ret;
318
319                 need_new_journal_entry = j->buf[res.idx].nr_prio_buckets <
320                         ca->sb.nr_this_dev + 1;
321                 bch_journal_res_put(j, &res);
322
323                 ret = bch_journal_flush_seq(j, res.seq);
324                 if (ret)
325                         return ret;
326         } while (need_new_journal_entry);
327
328         /*
329          * Don't want the old priorities to get garbage collected until after we
330          * finish writing the new ones, and they're journalled
331          */
332
333         spin_lock(&ca->prio_buckets_lock);
334
335         for (i = 0; i < prio_buckets(ca); i++) {
336                 if (ca->prio_last_buckets[i])
337                         __bch_bucket_free(ca,
338                                 &ca->buckets[ca->prio_last_buckets[i]]);
339
340                 ca->prio_last_buckets[i] = ca->prio_buckets[i];
341         }
342
343         spin_unlock(&ca->prio_buckets_lock);
344
345         trace_bcache_prio_write_end(ca);
346         return 0;
347 }
348
349 int bch_prio_read(struct cache *ca)
350 {
351         struct cache_set *c = ca->set;
352         struct prio_set *p = ca->disk_buckets;
353         struct bucket_disk *d = p->data + prios_per_bucket(ca), *end = d;
354         struct bucket_mark new;
355         unsigned bucket_nr = 0;
356         u64 bucket, expect, got;
357         size_t b;
358         int ret = 0;
359
360         spin_lock(&c->journal.lock);
361         bucket = le64_to_cpu(c->journal.prio_buckets[ca->sb.nr_this_dev]);
362         spin_unlock(&c->journal.lock);
363
364         /*
365          * If the device hasn't been used yet, there won't be a prio bucket ptr
366          */
367         if (!bucket)
368                 return 0;
369
370         unfixable_fsck_err_on(bucket < ca->mi.first_bucket ||
371                               bucket >= ca->mi.nbuckets, c,
372                               "bad prio bucket %llu", bucket);
373
374         for (b = 0; b < ca->mi.nbuckets; b++, d++) {
375                 if (d == end) {
376                         ca->prio_last_buckets[bucket_nr] = bucket;
377                         bucket_nr++;
378
379                         ret = prio_io(ca, bucket, REQ_OP_READ);
380                         if (cache_fatal_io_err_on(ret, ca,
381                                         "prior read from bucket %llu",
382                                         bucket) ||
383                             bch_meta_read_fault("prio"))
384                                 return -EIO;
385
386                         got = le64_to_cpu(p->magic);
387                         expect = pset_magic(&c->disk_sb);
388                         unfixable_fsck_err_on(got != expect, c,
389                                 "bad magic (got %llu expect %llu) while reading prios from bucket %llu",
390                                 got, expect, bucket);
391
392                         got = le64_to_cpu(p->csum);
393                         expect = bch_checksum(PSET_CSUM_TYPE(p),
394                                               &p->magic,
395                                               bucket_bytes(ca) - 8);
396                         unfixable_fsck_err_on(got != expect, c,
397                                 "bad checksum (got %llu expect %llu) while reading prios from bucket %llu",
398                                 got, expect, bucket);
399
400                         bucket = le64_to_cpu(p->next_bucket);
401                         d = p->data;
402                 }
403
404                 ca->buckets[b].read_prio = le16_to_cpu(d->read_prio);
405                 ca->buckets[b].write_prio = le16_to_cpu(d->write_prio);
406
407                 bucket_cmpxchg(&ca->buckets[b], new, new.gen = d->gen);
408         }
409 fsck_err:
410         return 0;
411 }
412
413 #define BUCKET_GC_GEN_MAX       96U
414
415 /**
416  * wait_buckets_available - wait on reclaimable buckets
417  *
418  * If there aren't enough available buckets to fill up free_inc, wait until
419  * there are.
420  */
421 static int wait_buckets_available(struct cache *ca)
422 {
423         struct cache_set *c = ca->set;
424         int ret = 0;
425
426         while (1) {
427                 set_current_state(TASK_INTERRUPTIBLE);
428                 if (kthread_should_stop()) {
429                         ret = -1;
430                         break;
431                 }
432
433                 if (ca->inc_gen_needs_gc >= fifo_free(&ca->free_inc)) {
434                         if (c->gc_thread) {
435                                 trace_bcache_gc_cannot_inc_gens(ca->set);
436                                 atomic_inc(&c->kick_gc);
437                                 wake_up_process(ca->set->gc_thread);
438                         }
439
440                         /*
441                          * We are going to wait for GC to wake us up, even if
442                          * bucket counters tell us enough buckets are available,
443                          * because we are actually waiting for GC to rewrite
444                          * nodes with stale pointers
445                          */
446                 } else if (buckets_available_cache(ca) >=
447                            fifo_free(&ca->free_inc))
448                         break;
449
450                 up_read(&ca->set->gc_lock);
451                 schedule();
452                 try_to_freeze();
453                 down_read(&ca->set->gc_lock);
454         }
455
456         __set_current_state(TASK_RUNNING);
457         return ret;
458 }
459
460 static void verify_not_on_freelist(struct cache *ca, size_t bucket)
461 {
462         if (expensive_debug_checks(ca->set)) {
463                 size_t iter;
464                 long i;
465                 unsigned j;
466
467                 for (iter = 0; iter < prio_buckets(ca) * 2; iter++)
468                         BUG_ON(ca->prio_buckets[iter] == bucket);
469
470                 for (j = 0; j < RESERVE_NR; j++)
471                         fifo_for_each_entry(i, &ca->free[j], iter)
472                                 BUG_ON(i == bucket);
473                 fifo_for_each_entry(i, &ca->free_inc, iter)
474                         BUG_ON(i == bucket);
475         }
476 }
477
478 /* Bucket heap / gen */
479
480 void bch_recalc_min_prio(struct cache *ca, int rw)
481 {
482         struct cache_set *c = ca->set;
483         struct prio_clock *clock = &c->prio_clock[rw];
484         struct bucket *g;
485         u16 max_delta = 1;
486         unsigned i;
487
488         /* Determine min prio for this particular cache */
489         for_each_bucket(g, ca)
490                 max_delta = max(max_delta, (u16) (clock->hand - g->prio[rw]));
491
492         ca->min_prio[rw] = clock->hand - max_delta;
493
494         /*
495          * This may possibly increase the min prio for the whole cache, check
496          * that as well.
497          */
498         max_delta = 1;
499
500         for_each_cache(ca, c, i)
501                 max_delta = max(max_delta,
502                                 (u16) (clock->hand - ca->min_prio[rw]));
503
504         clock->min_prio = clock->hand - max_delta;
505 }
506
507 static void bch_rescale_prios(struct cache_set *c, int rw)
508 {
509         struct prio_clock *clock = &c->prio_clock[rw];
510         struct cache *ca;
511         struct bucket *g;
512         unsigned i;
513
514         trace_bcache_rescale_prios(c);
515
516         for_each_cache(ca, c, i) {
517                 for_each_bucket(g, ca)
518                         g->prio[rw] = clock->hand -
519                                 (clock->hand - g->prio[rw]) / 2;
520
521                 bch_recalc_min_prio(ca, rw);
522         }
523 }
524
525 static void bch_inc_clock_hand(struct io_timer *timer)
526 {
527         struct prio_clock *clock = container_of(timer,
528                                         struct prio_clock, rescale);
529         struct cache_set *c = container_of(clock,
530                                 struct cache_set, prio_clock[clock->rw]);
531         u64 capacity;
532
533         mutex_lock(&c->bucket_lock);
534
535         clock->hand++;
536
537         /* if clock cannot be advanced more, rescale prio */
538         if (clock->hand == (u16) (clock->min_prio - 1))
539                 bch_rescale_prios(c, clock->rw);
540
541         mutex_unlock(&c->bucket_lock);
542
543         capacity = READ_ONCE(c->capacity);
544
545         if (!capacity)
546                 return;
547
548         /*
549          * we only increment when 0.1% of the cache_set has been read
550          * or written too, this determines if it's time
551          *
552          * XXX: we shouldn't really be going off of the capacity of devices in
553          * RW mode (that will be 0 when we're RO, yet we can still service
554          * reads)
555          */
556         timer->expire += capacity >> 10;
557
558         bch_io_timer_add(&c->io_clock[clock->rw], timer);
559 }
560
561 static void bch_prio_timer_init(struct cache_set *c, int rw)
562 {
563         struct prio_clock *clock = &c->prio_clock[rw];
564         struct io_timer *timer = &clock->rescale;
565
566         clock->rw       = rw;
567         timer->fn       = bch_inc_clock_hand;
568         timer->expire   = c->capacity >> 10;
569 }
570
571 /*
572  * Background allocation thread: scans for buckets to be invalidated,
573  * invalidates them, rewrites prios/gens (marking them as invalidated on disk),
574  * then optionally issues discard commands to the newly free buckets, then puts
575  * them on the various freelists.
576  */
577
578 static inline bool can_inc_bucket_gen(struct cache *ca, struct bucket *g)
579 {
580         return bucket_gc_gen(ca, g) < BUCKET_GC_GEN_MAX;
581 }
582
583 static bool bch_can_invalidate_bucket(struct cache *ca, struct bucket *g)
584 {
585         if (!is_available_bucket(READ_ONCE(g->mark)))
586                 return false;
587
588         if (bucket_gc_gen(ca, g) >= BUCKET_GC_GEN_MAX - 1)
589                 ca->inc_gen_needs_gc++;
590
591         return can_inc_bucket_gen(ca, g);
592 }
593
594 static void bch_invalidate_one_bucket(struct cache *ca, struct bucket *g)
595 {
596         spin_lock(&ca->freelist_lock);
597
598         bch_invalidate_bucket(ca, g);
599
600         g->read_prio = ca->set->prio_clock[READ].hand;
601         g->write_prio = ca->set->prio_clock[WRITE].hand;
602
603         verify_not_on_freelist(ca, g - ca->buckets);
604         BUG_ON(!fifo_push(&ca->free_inc, g - ca->buckets));
605
606         spin_unlock(&ca->freelist_lock);
607 }
608
609 /*
610  * Determines what order we're going to reuse buckets, smallest bucket_key()
611  * first.
612  *
613  *
614  * - We take into account the read prio of the bucket, which gives us an
615  *   indication of how hot the data is -- we scale the prio so that the prio
616  *   farthest from the clock is worth 1/8th of the closest.
617  *
618  * - The number of sectors of cached data in the bucket, which gives us an
619  *   indication of the cost in cache misses this eviction will cause.
620  *
621  * - The difference between the bucket's current gen and oldest gen of any
622  *   pointer into it, which gives us an indication of the cost of an eventual
623  *   btree GC to rewrite nodes with stale pointers.
624  */
625
626 #define bucket_sort_key(g)                                              \
627 ({                                                                      \
628         unsigned long prio = g->read_prio - ca->min_prio[READ];         \
629         prio = (prio * 7) / (ca->set->prio_clock[READ].hand -           \
630                              ca->min_prio[READ]);                       \
631                                                                         \
632         (((prio + 1) * bucket_sectors_used(g)) << 8) | bucket_gc_gen(ca, g);\
633 })
634
635 static void invalidate_buckets_lru(struct cache *ca)
636 {
637         struct bucket_heap_entry e;
638         struct bucket *g;
639         unsigned i;
640
641         mutex_lock(&ca->heap_lock);
642
643         ca->heap.used = 0;
644
645         mutex_lock(&ca->set->bucket_lock);
646         bch_recalc_min_prio(ca, READ);
647         bch_recalc_min_prio(ca, WRITE);
648
649         /*
650          * Find buckets with lowest read priority, by building a maxheap sorted
651          * by read priority and repeatedly replacing the maximum element until
652          * all buckets have been visited.
653          */
654         for_each_bucket(g, ca) {
655                 if (!bch_can_invalidate_bucket(ca, g))
656                         continue;
657
658                 bucket_heap_push(ca, g, bucket_sort_key(g));
659         }
660
661         /* Sort buckets by physical location on disk for better locality */
662         for (i = 0; i < ca->heap.used; i++) {
663                 struct bucket_heap_entry *e = &ca->heap.data[i];
664
665                 e->val = e->g - ca->buckets;
666         }
667
668         heap_resort(&ca->heap, bucket_max_cmp);
669
670         /*
671          * If we run out of buckets to invalidate, bch_allocator_thread() will
672          * kick stuff and retry us
673          */
674         while (!fifo_full(&ca->free_inc) &&
675                heap_pop(&ca->heap, e, bucket_max_cmp)) {
676                 BUG_ON(!bch_can_invalidate_bucket(ca, e.g));
677                 bch_invalidate_one_bucket(ca, e.g);
678         }
679
680         mutex_unlock(&ca->set->bucket_lock);
681         mutex_unlock(&ca->heap_lock);
682 }
683
684 static void invalidate_buckets_fifo(struct cache *ca)
685 {
686         struct bucket *g;
687         size_t checked = 0;
688
689         while (!fifo_full(&ca->free_inc)) {
690                 if (ca->fifo_last_bucket <  ca->mi.first_bucket ||
691                     ca->fifo_last_bucket >= ca->mi.nbuckets)
692                         ca->fifo_last_bucket = ca->mi.first_bucket;
693
694                 g = ca->buckets + ca->fifo_last_bucket++;
695
696                 if (bch_can_invalidate_bucket(ca, g))
697                         bch_invalidate_one_bucket(ca, g);
698
699                 if (++checked >= ca->mi.nbuckets)
700                         return;
701         }
702 }
703
704 static void invalidate_buckets_random(struct cache *ca)
705 {
706         struct bucket *g;
707         size_t checked = 0;
708
709         while (!fifo_full(&ca->free_inc)) {
710                 size_t n = bch_rand_range(ca->mi.nbuckets -
711                                           ca->mi.first_bucket) +
712                         ca->mi.first_bucket;
713
714                 g = ca->buckets + n;
715
716                 if (bch_can_invalidate_bucket(ca, g))
717                         bch_invalidate_one_bucket(ca, g);
718
719                 if (++checked >= ca->mi.nbuckets / 2)
720                         return;
721         }
722 }
723
724 static void invalidate_buckets(struct cache *ca)
725 {
726         ca->inc_gen_needs_gc = 0;
727
728         switch (ca->mi.replacement) {
729         case CACHE_REPLACEMENT_LRU:
730                 invalidate_buckets_lru(ca);
731                 break;
732         case CACHE_REPLACEMENT_FIFO:
733                 invalidate_buckets_fifo(ca);
734                 break;
735         case CACHE_REPLACEMENT_RANDOM:
736                 invalidate_buckets_random(ca);
737                 break;
738         }
739 }
740
741 static bool __bch_allocator_push(struct cache *ca, long bucket)
742 {
743         if (fifo_push(&ca->free[RESERVE_PRIO], bucket))
744                 goto success;
745
746         if (fifo_push(&ca->free[RESERVE_MOVINGGC], bucket))
747                 goto success;
748
749         if (fifo_push(&ca->free[RESERVE_BTREE], bucket))
750                 goto success;
751
752         if (fifo_push(&ca->free[RESERVE_NONE], bucket))
753                 goto success;
754
755         return false;
756 success:
757         closure_wake_up(&ca->set->freelist_wait);
758         return true;
759 }
760
761 static bool bch_allocator_push(struct cache *ca, long bucket)
762 {
763         bool ret;
764
765         spin_lock(&ca->freelist_lock);
766         ret = __bch_allocator_push(ca, bucket);
767         if (ret)
768                 fifo_pop(&ca->free_inc, bucket);
769         spin_unlock(&ca->freelist_lock);
770
771         return ret;
772 }
773
774 static void bch_find_empty_buckets(struct cache_set *c, struct cache *ca)
775 {
776         u16 last_seq_ondisk = c->journal.last_seq_ondisk;
777         struct bucket *g;
778
779         for_each_bucket(g, ca) {
780                 struct bucket_mark m = READ_ONCE(g->mark);
781
782                 if (is_available_bucket(m) &&
783                     !m.cached_sectors &&
784                     !m.had_metadata &&
785                     (!m.wait_on_journal ||
786                      ((s16) last_seq_ondisk - (s16) m.journal_seq >= 0))) {
787                         spin_lock(&ca->freelist_lock);
788
789                         bch_mark_alloc_bucket(ca, g, true);
790                         g->read_prio = ca->set->prio_clock[READ].hand;
791                         g->write_prio = ca->set->prio_clock[WRITE].hand;
792
793                         verify_not_on_freelist(ca, g - ca->buckets);
794                         BUG_ON(!fifo_push(&ca->free_inc, g - ca->buckets));
795
796                         spin_unlock(&ca->freelist_lock);
797
798                         if (fifo_full(&ca->free_inc))
799                                 break;
800                 }
801         }
802 }
803
804 /**
805  * bch_allocator_thread - move buckets from free_inc to reserves
806  *
807  * The free_inc FIFO is populated by invalidate_buckets(), and
808  * the reserves are depleted by bucket allocation. When we run out
809  * of free_inc, try to invalidate some buckets and write out
810  * prios and gens.
811  */
812 static int bch_allocator_thread(void *arg)
813 {
814         struct cache *ca = arg;
815         struct cache_set *c = ca->set;
816         int ret;
817
818         set_freezable();
819
820         while (1) {
821                 /*
822                  * First, we pull buckets off of the free_inc list, possibly
823                  * issue discards to them, then we add the bucket to a
824                  * free list:
825                  */
826
827                 while (!fifo_empty(&ca->free_inc)) {
828                         long bucket = fifo_peek(&ca->free_inc);
829
830                         /*
831                          * Don't remove from free_inc until after it's added
832                          * to freelist, so gc doesn't miss it while we've
833                          * dropped bucket lock
834                          */
835
836                         if (ca->mi.discard &&
837                             blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
838                                 blkdev_issue_discard(ca->disk_sb.bdev,
839                                         bucket_to_sector(ca, bucket),
840                                         ca->mi.bucket_size, GFP_NOIO, 0);
841
842                         while (1) {
843                                 set_current_state(TASK_INTERRUPTIBLE);
844                                 if (bch_allocator_push(ca, bucket))
845                                         break;
846
847                                 if (kthread_should_stop()) {
848                                         __set_current_state(TASK_RUNNING);
849                                         goto out;
850                                 }
851                                 schedule();
852                                 try_to_freeze();
853                         }
854
855                         __set_current_state(TASK_RUNNING);
856                 }
857
858                 down_read(&c->gc_lock);
859
860                 /*
861                  * See if we have buckets we can reuse without invalidating them
862                  * or forcing a journal commit:
863                  */
864                 bch_find_empty_buckets(c, ca);
865
866                 if (fifo_used(&ca->free_inc) * 2 > ca->free_inc.size) {
867                         up_read(&c->gc_lock);
868                         continue;
869                 }
870
871                 /* We've run out of free buckets! */
872
873                 while (!fifo_full(&ca->free_inc)) {
874                         if (wait_buckets_available(ca)) {
875                                 up_read(&c->gc_lock);
876                                 goto out;
877                         }
878
879                         /*
880                          * Find some buckets that we can invalidate, either
881                          * they're completely unused, or only contain clean data
882                          * that's been written back to the backing device or
883                          * another cache tier
884                          */
885
886                         invalidate_buckets(ca);
887                         trace_bcache_alloc_batch(ca, fifo_used(&ca->free_inc),
888                                                  ca->free_inc.size);
889                 }
890
891                 up_read(&c->gc_lock);
892
893                 /*
894                  * free_inc is full of newly-invalidated buckets, must write out
895                  * prios and gens before they can be re-used
896                  */
897                 ret = bch_prio_write(ca);
898                 if (ret) {
899                         /*
900                          * Emergency read only - allocator thread has to
901                          * shutdown.
902                          *
903                          * N.B. we better be going into RO mode, else
904                          * allocations would hang indefinitely - whatever
905                          * generated the error will have sent us into RO mode.
906                          *
907                          * Clear out the free_inc freelist so things are
908                          * consistent-ish:
909                          */
910                         spin_lock(&ca->freelist_lock);
911                         while (!fifo_empty(&ca->free_inc)) {
912                                 long bucket;
913
914                                 fifo_pop(&ca->free_inc, bucket);
915                                 bch_mark_free_bucket(ca, ca->buckets + bucket);
916                         }
917                         spin_unlock(&ca->freelist_lock);
918                         goto out;
919                 }
920         }
921 out:
922         /*
923          * Avoid a race with bucket_stats_update() trying to wake us up after
924          * we've exited:
925          */
926         synchronize_rcu();
927         return 0;
928 }
929
930 /* Allocation */
931
932 /**
933  * bch_bucket_alloc - allocate a single bucket from a specific device
934  *
935  * Returns index of bucket on success, 0 on failure
936  * */
937 static size_t bch_bucket_alloc(struct cache *ca, enum alloc_reserve reserve)
938 {
939         struct bucket *g;
940         long r;
941
942         spin_lock(&ca->freelist_lock);
943         if (fifo_pop(&ca->free[RESERVE_NONE], r) ||
944             fifo_pop(&ca->free[reserve], r))
945                 goto out;
946
947         spin_unlock(&ca->freelist_lock);
948
949         trace_bcache_bucket_alloc_fail(ca, reserve);
950         return 0;
951 out:
952         verify_not_on_freelist(ca, r);
953         spin_unlock(&ca->freelist_lock);
954
955         trace_bcache_bucket_alloc(ca, reserve);
956
957         bch_wake_allocator(ca);
958
959         g = ca->buckets + r;
960
961         g->read_prio = ca->set->prio_clock[READ].hand;
962         g->write_prio = ca->set->prio_clock[WRITE].hand;
963
964         return r;
965 }
966
967 static void __bch_bucket_free(struct cache *ca, struct bucket *g)
968 {
969         bch_mark_free_bucket(ca, g);
970
971         g->read_prio = ca->set->prio_clock[READ].hand;
972         g->write_prio = ca->set->prio_clock[WRITE].hand;
973 }
974
975 enum bucket_alloc_ret {
976         ALLOC_SUCCESS,
977         NO_DEVICES,             /* -EROFS */
978         FREELIST_EMPTY,         /* Allocator thread not keeping up */
979 };
980
981 static void recalc_alloc_group_weights(struct cache_set *c,
982                                        struct cache_group *devs)
983 {
984         struct cache *ca;
985         u64 available_buckets = 1; /* avoid a divide by zero... */
986         unsigned i;
987
988         for (i = 0; i < devs->nr_devices; i++) {
989                 ca = devs->d[i].dev;
990
991                 devs->d[i].weight = buckets_free_cache(ca);
992                 available_buckets += devs->d[i].weight;
993         }
994
995         for (i = 0; i < devs->nr_devices; i++) {
996                 const unsigned min_weight = U32_MAX >> 4;
997                 const unsigned max_weight = U32_MAX;
998
999                 devs->d[i].weight =
1000                         min_weight +
1001                         div64_u64(devs->d[i].weight *
1002                                   devs->nr_devices *
1003                                   (max_weight - min_weight),
1004                                   available_buckets);
1005                 devs->d[i].weight = min_t(u64, devs->d[i].weight, max_weight);
1006         }
1007 }
1008
1009 static enum bucket_alloc_ret bch_bucket_alloc_group(struct cache_set *c,
1010                                                     struct open_bucket *ob,
1011                                                     enum alloc_reserve reserve,
1012                                                     unsigned nr_replicas,
1013                                                     struct cache_group *devs,
1014                                                     long *caches_used)
1015 {
1016         enum bucket_alloc_ret ret;
1017         unsigned fail_idx = -1, i;
1018         unsigned available = 0;
1019
1020         BUG_ON(nr_replicas > ARRAY_SIZE(ob->ptrs));
1021
1022         if (ob->nr_ptrs >= nr_replicas)
1023                 return ALLOC_SUCCESS;
1024
1025         rcu_read_lock();
1026         spin_lock(&devs->lock);
1027
1028         for (i = 0; i < devs->nr_devices; i++)
1029                 available += !test_bit(devs->d[i].dev->sb.nr_this_dev,
1030                                        caches_used);
1031
1032         recalc_alloc_group_weights(c, devs);
1033
1034         i = devs->cur_device;
1035
1036         while (ob->nr_ptrs < nr_replicas) {
1037                 struct cache *ca;
1038                 u64 bucket;
1039
1040                 if (!available) {
1041                         ret = NO_DEVICES;
1042                         goto err;
1043                 }
1044
1045                 i++;
1046                 i %= devs->nr_devices;
1047
1048                 ret = FREELIST_EMPTY;
1049                 if (i == fail_idx)
1050                         goto err;
1051
1052                 ca = devs->d[i].dev;
1053
1054                 if (test_bit(ca->sb.nr_this_dev, caches_used))
1055                         continue;
1056
1057                 if (fail_idx == -1 &&
1058                     get_random_int() > devs->d[i].weight)
1059                         continue;
1060
1061                 bucket = bch_bucket_alloc(ca, reserve);
1062                 if (!bucket) {
1063                         if (fail_idx == -1)
1064                                 fail_idx = i;
1065                         continue;
1066                 }
1067
1068                 /*
1069                  * open_bucket_add_buckets expects new pointers at the head of
1070                  * the list:
1071                  */
1072                 memmove(&ob->ptrs[1],
1073                         &ob->ptrs[0],
1074                         ob->nr_ptrs * sizeof(ob->ptrs[0]));
1075                 memmove(&ob->ptr_offset[1],
1076                         &ob->ptr_offset[0],
1077                         ob->nr_ptrs * sizeof(ob->ptr_offset[0]));
1078                 ob->nr_ptrs++;
1079                 ob->ptrs[0] = (struct bch_extent_ptr) {
1080                         .gen    = ca->buckets[bucket].mark.gen,
1081                         .offset = bucket_to_sector(ca, bucket),
1082                         .dev    = ca->sb.nr_this_dev,
1083                 };
1084                 ob->ptr_offset[0] = 0;
1085
1086                 __set_bit(ca->sb.nr_this_dev, caches_used);
1087                 available--;
1088                 devs->cur_device = i;
1089         }
1090
1091         ret = ALLOC_SUCCESS;
1092 err:
1093         EBUG_ON(ret != ALLOC_SUCCESS && reserve == RESERVE_MOVINGGC);
1094         spin_unlock(&devs->lock);
1095         rcu_read_unlock();
1096         return ret;
1097 }
1098
1099 static enum bucket_alloc_ret __bch_bucket_alloc_set(struct cache_set *c,
1100                                                     struct write_point *wp,
1101                                                     struct open_bucket *ob,
1102                                                     unsigned nr_replicas,
1103                                                     enum alloc_reserve reserve,
1104                                                     long *caches_used)
1105 {
1106         /*
1107          * this should implement policy - for a given type of allocation, decide
1108          * which devices to allocate from:
1109          *
1110          * XXX: switch off wp->type and do something more intelligent here
1111          */
1112
1113         /* foreground writes: prefer tier 0: */
1114         if (wp->group == &c->cache_all)
1115                 bch_bucket_alloc_group(c, ob, reserve, nr_replicas,
1116                                        &c->cache_tiers[0], caches_used);
1117
1118         return bch_bucket_alloc_group(c, ob, reserve, nr_replicas,
1119                                       wp->group, caches_used);
1120 }
1121
1122 static int bch_bucket_alloc_set(struct cache_set *c, struct write_point *wp,
1123                                 struct open_bucket *ob, unsigned nr_replicas,
1124                                 enum alloc_reserve reserve, long *caches_used,
1125                                 struct closure *cl)
1126 {
1127         bool waiting = false;
1128
1129         while (1) {
1130                 switch (__bch_bucket_alloc_set(c, wp, ob, nr_replicas,
1131                                                reserve, caches_used)) {
1132                 case ALLOC_SUCCESS:
1133                         if (waiting)
1134                                 closure_wake_up(&c->freelist_wait);
1135
1136                         return 0;
1137
1138                 case NO_DEVICES:
1139                         if (waiting)
1140                                 closure_wake_up(&c->freelist_wait);
1141                         return -EROFS;
1142
1143                 case FREELIST_EMPTY:
1144                         if (!cl || waiting)
1145                                 trace_bcache_freelist_empty_fail(c,
1146                                                         reserve, cl);
1147
1148                         if (!cl)
1149                                 return -ENOSPC;
1150
1151                         if (waiting)
1152                                 return -EAGAIN;
1153
1154                         /* Retry allocation after adding ourself to waitlist: */
1155                         closure_wait(&c->freelist_wait, cl);
1156                         waiting = true;
1157                         break;
1158                 default:
1159                         BUG();
1160                 }
1161         }
1162 }
1163
1164 /* Open buckets: */
1165
1166 /*
1167  * Open buckets represent one or more buckets (on multiple devices) that are
1168  * currently being allocated from. They serve two purposes:
1169  *
1170  *  - They track buckets that have been partially allocated, allowing for
1171  *    sub-bucket sized allocations - they're used by the sector allocator below
1172  *
1173  *  - They provide a reference to the buckets they own that mark and sweep GC
1174  *    can find, until the new allocation has a pointer to it inserted into the
1175  *    btree
1176  *
1177  * When allocating some space with the sector allocator, the allocation comes
1178  * with a reference to an open bucket - the caller is required to put that
1179  * reference _after_ doing the index update that makes its allocation reachable.
1180  */
1181
1182 static void __bch_open_bucket_put(struct cache_set *c, struct open_bucket *ob)
1183 {
1184         const struct bch_extent_ptr *ptr;
1185         struct cache *ca;
1186
1187         lockdep_assert_held(&c->open_buckets_lock);
1188
1189         rcu_read_lock();
1190         open_bucket_for_each_online_device(c, ob, ptr, ca)
1191                 bch_mark_alloc_bucket(ca, PTR_BUCKET(ca, ptr), false);
1192         rcu_read_unlock();
1193
1194         ob->nr_ptrs = 0;
1195
1196         list_move(&ob->list, &c->open_buckets_free);
1197         c->open_buckets_nr_free++;
1198         closure_wake_up(&c->open_buckets_wait);
1199 }
1200
1201 void bch_open_bucket_put(struct cache_set *c, struct open_bucket *b)
1202 {
1203         if (atomic_dec_and_test(&b->pin)) {
1204                 spin_lock(&c->open_buckets_lock);
1205                 __bch_open_bucket_put(c, b);
1206                 spin_unlock(&c->open_buckets_lock);
1207         }
1208 }
1209
1210 static struct open_bucket *bch_open_bucket_get(struct cache_set *c,
1211                                                unsigned nr_reserved,
1212                                                struct closure *cl)
1213 {
1214         struct open_bucket *ret;
1215
1216         spin_lock(&c->open_buckets_lock);
1217
1218         if (c->open_buckets_nr_free > nr_reserved) {
1219                 BUG_ON(list_empty(&c->open_buckets_free));
1220                 ret = list_first_entry(&c->open_buckets_free,
1221                                        struct open_bucket, list);
1222                 list_move(&ret->list, &c->open_buckets_open);
1223                 BUG_ON(ret->nr_ptrs);
1224
1225                 atomic_set(&ret->pin, 1); /* XXX */
1226                 ret->has_full_ptrs      = false;
1227
1228                 c->open_buckets_nr_free--;
1229                 trace_bcache_open_bucket_alloc(c, cl);
1230         } else {
1231                 trace_bcache_open_bucket_alloc_fail(c, cl);
1232
1233                 if (cl) {
1234                         closure_wait(&c->open_buckets_wait, cl);
1235                         ret = ERR_PTR(-EAGAIN);
1236                 } else
1237                         ret = ERR_PTR(-ENOSPC);
1238         }
1239
1240         spin_unlock(&c->open_buckets_lock);
1241
1242         return ret;
1243 }
1244
1245 static unsigned ob_ptr_sectors_free(struct open_bucket *ob,
1246                                     struct cache_member_rcu *mi,
1247                                     struct bch_extent_ptr *ptr)
1248 {
1249         unsigned i = ptr - ob->ptrs;
1250         unsigned bucket_size = mi->m[ptr->dev].bucket_size;
1251         unsigned used = (ptr->offset & (bucket_size - 1)) +
1252                 ob->ptr_offset[i];
1253
1254         BUG_ON(used > bucket_size);
1255
1256         return bucket_size - used;
1257 }
1258
1259 static unsigned open_bucket_sectors_free(struct cache_set *c,
1260                                          struct open_bucket *ob,
1261                                          unsigned nr_replicas)
1262 {
1263         struct cache_member_rcu *mi = cache_member_info_get(c);
1264         unsigned i, sectors_free = UINT_MAX;
1265
1266         BUG_ON(nr_replicas > ob->nr_ptrs);
1267
1268         for (i = 0; i < nr_replicas; i++)
1269                 sectors_free = min(sectors_free,
1270                                    ob_ptr_sectors_free(ob, mi, &ob->ptrs[i]));
1271
1272         cache_member_info_put();
1273
1274         return sectors_free != UINT_MAX ? sectors_free : 0;
1275 }
1276
1277 static void open_bucket_copy_unused_ptrs(struct cache_set *c,
1278                                          struct open_bucket *new,
1279                                          struct open_bucket *old)
1280 {
1281         struct cache_member_rcu *mi = cache_member_info_get(c);
1282         unsigned i;
1283
1284         for (i = 0; i < old->nr_ptrs; i++)
1285                 if (ob_ptr_sectors_free(old, mi, &old->ptrs[i])) {
1286                         struct bch_extent_ptr tmp = old->ptrs[i];
1287
1288                         tmp.offset += old->ptr_offset[i];
1289                         new->ptrs[new->nr_ptrs] = tmp;
1290                         new->ptr_offset[new->nr_ptrs] = 0;
1291                         new->nr_ptrs++;
1292                 }
1293         cache_member_info_put();
1294 }
1295
1296 static void verify_not_stale(struct cache_set *c, const struct open_bucket *ob)
1297 {
1298 #ifdef CONFIG_BCACHE_DEBUG
1299         const struct bch_extent_ptr *ptr;
1300         struct cache *ca;
1301
1302         rcu_read_lock();
1303         open_bucket_for_each_online_device(c, ob, ptr, ca)
1304                 BUG_ON(ptr_stale(ca, ptr));
1305         rcu_read_unlock();
1306 #endif
1307 }
1308
1309 /* Sector allocator */
1310
1311 static struct open_bucket *lock_writepoint(struct cache_set *c,
1312                                            struct write_point *wp)
1313 {
1314         struct open_bucket *ob;
1315
1316         while ((ob = ACCESS_ONCE(wp->b))) {
1317                 mutex_lock(&ob->lock);
1318                 if (wp->b == ob)
1319                         break;
1320
1321                 mutex_unlock(&ob->lock);
1322         }
1323
1324         return ob;
1325 }
1326
1327 static int open_bucket_add_buckets(struct cache_set *c,
1328                                    struct write_point *wp,
1329                                    struct open_bucket *ob,
1330                                    unsigned nr_replicas,
1331                                    enum alloc_reserve reserve,
1332                                    struct closure *cl)
1333 {
1334         long caches_used[BITS_TO_LONGS(MAX_CACHES_PER_SET)];
1335         int i, dst;
1336
1337         /*
1338          * We might be allocating pointers to add to an existing extent
1339          * (tiering/copygc/migration) - if so, some of the pointers in our
1340          * existing open bucket might duplicate devices we already have. This is
1341          * moderately annoying.
1342          */
1343
1344         /* Short circuit all the fun stuff if posssible: */
1345         if (ob->nr_ptrs >= nr_replicas)
1346                 return 0;
1347
1348         memset(caches_used, 0, sizeof(caches_used));
1349
1350         /*
1351          * Shuffle pointers to devices we already have to the end:
1352          * bch_bucket_alloc_set() will add new pointers to the statr of @b, and
1353          * bch_alloc_sectors_done() will add the first nr_replicas ptrs to @e:
1354          */
1355         for (i = dst = ob->nr_ptrs - 1; i >= 0; --i)
1356                 if (__test_and_set_bit(ob->ptrs[i].dev, caches_used)) {
1357                         if (i != dst) {
1358                                 swap(ob->ptrs[i], ob->ptrs[dst]);
1359                                 swap(ob->ptr_offset[i], ob->ptr_offset[dst]);
1360                         }
1361                         --dst;
1362                         nr_replicas++;
1363                 }
1364
1365         return bch_bucket_alloc_set(c, wp, ob, nr_replicas,
1366                                     reserve, caches_used, cl);
1367 }
1368
1369 /*
1370  * Get us an open_bucket we can allocate from, return with it locked:
1371  */
1372 struct open_bucket *bch_alloc_sectors_start(struct cache_set *c,
1373                                             struct write_point *wp,
1374                                             unsigned nr_replicas,
1375                                             enum alloc_reserve reserve,
1376                                             struct closure *cl)
1377 {
1378         struct open_bucket *ob;
1379         unsigned open_buckets_reserved = wp == &c->btree_write_point
1380                 ? 0 : BTREE_NODE_RESERVE;
1381         int ret;
1382
1383         BUG_ON(!wp->group);
1384         BUG_ON(!reserve);
1385         BUG_ON(!nr_replicas);
1386 retry:
1387         ob = lock_writepoint(c, wp);
1388
1389         /*
1390          * If ob->sectors_free == 0, one or more of the buckets ob points to is
1391          * full. We can't drop pointers from an open bucket - garbage collection
1392          * still needs to find them; instead, we must allocate a new open bucket
1393          * and copy any pointers to non-full buckets into the new open bucket.
1394          */
1395         if (!ob || ob->has_full_ptrs) {
1396                 struct open_bucket *new_ob;
1397
1398                 new_ob = bch_open_bucket_get(c, open_buckets_reserved, cl);
1399                 if (IS_ERR(new_ob))
1400                         return new_ob;
1401
1402                 mutex_lock(&new_ob->lock);
1403
1404                 /*
1405                  * We point the write point at the open_bucket before doing the
1406                  * allocation to avoid a race with shutdown:
1407                  */
1408                 if (race_fault() ||
1409                     cmpxchg(&wp->b, ob, new_ob) != ob) {
1410                         /* We raced: */
1411                         mutex_unlock(&new_ob->lock);
1412                         bch_open_bucket_put(c, new_ob);
1413
1414                         if (ob)
1415                                 mutex_unlock(&ob->lock);
1416                         goto retry;
1417                 }
1418
1419                 if (ob) {
1420                         open_bucket_copy_unused_ptrs(c, new_ob, ob);
1421                         mutex_unlock(&ob->lock);
1422                         bch_open_bucket_put(c, ob);
1423                 }
1424
1425                 ob = new_ob;
1426         }
1427
1428         ret = open_bucket_add_buckets(c, wp, ob, nr_replicas,
1429                                       reserve, cl);
1430         if (ret) {
1431                 mutex_unlock(&ob->lock);
1432                 return ERR_PTR(ret);
1433         }
1434
1435         ob->sectors_free = open_bucket_sectors_free(c, ob, nr_replicas);
1436
1437         BUG_ON(!ob->sectors_free);
1438         verify_not_stale(c, ob);
1439
1440         return ob;
1441 }
1442
1443 /*
1444  * Append pointers to the space we just allocated to @k, and mark @sectors space
1445  * as allocated out of @ob
1446  */
1447 void bch_alloc_sectors_append_ptrs(struct cache_set *c, struct bkey_i_extent *e,
1448                                    unsigned nr_replicas, struct open_bucket *ob,
1449                                    unsigned sectors)
1450 {
1451         struct bch_extent_ptr tmp, *ptr;
1452         struct cache *ca;
1453         bool has_data = false;
1454         unsigned i;
1455
1456         /*
1457          * We're keeping any existing pointer k has, and appending new pointers:
1458          * __bch_write() will only write to the pointers we add here:
1459          */
1460
1461         /*
1462          * XXX: don't add pointers to devices @e already has
1463          */
1464         BUG_ON(nr_replicas > ob->nr_ptrs);
1465         BUG_ON(sectors > ob->sectors_free);
1466
1467         /* didn't use all the ptrs: */
1468         if (nr_replicas < ob->nr_ptrs)
1469                 has_data = true;
1470
1471         for (i = 0; i < nr_replicas; i++) {
1472                 EBUG_ON(bch_extent_has_device(extent_i_to_s_c(e), ob->ptrs[i].dev));
1473
1474                 tmp = ob->ptrs[i];
1475                 tmp.offset += ob->ptr_offset[i];
1476                 extent_ptr_append(e, tmp);
1477
1478                 ob->ptr_offset[i] += sectors;
1479         }
1480
1481         open_bucket_for_each_online_device(c, ob, ptr, ca)
1482                 this_cpu_add(*ca->sectors_written, sectors);
1483 }
1484
1485 /*
1486  * Append pointers to the space we just allocated to @k, and mark @sectors space
1487  * as allocated out of @ob
1488  */
1489 void bch_alloc_sectors_done(struct cache_set *c, struct write_point *wp,
1490                             struct open_bucket *ob)
1491 {
1492         struct cache_member_rcu *mi = cache_member_info_get(c);
1493         bool has_data = false;
1494         unsigned i;
1495
1496         for (i = 0; i < ob->nr_ptrs; i++) {
1497                 if (!ob_ptr_sectors_free(ob, mi, &ob->ptrs[i]))
1498                         ob->has_full_ptrs = true;
1499                 else
1500                         has_data = true;
1501         }
1502
1503         cache_member_info_put();
1504
1505         if (likely(has_data))
1506                 atomic_inc(&ob->pin);
1507         else
1508                 BUG_ON(xchg(&wp->b, NULL) != ob);
1509
1510         mutex_unlock(&ob->lock);
1511 }
1512
1513 /*
1514  * Allocates some space in the cache to write to, and k to point to the newly
1515  * allocated space, and updates k->size and k->offset (to point to the
1516  * end of the newly allocated space).
1517  *
1518  * May allocate fewer sectors than @sectors, k->size indicates how many
1519  * sectors were actually allocated.
1520  *
1521  * Return codes:
1522  * - -EAGAIN: closure was added to waitlist
1523  * - -ENOSPC: out of space and no closure provided
1524  *
1525  * @c  - cache set.
1526  * @wp - write point to use for allocating sectors.
1527  * @k  - key to return the allocated space information.
1528  * @cl - closure to wait for a bucket
1529  */
1530 struct open_bucket *bch_alloc_sectors(struct cache_set *c,
1531                                       struct write_point *wp,
1532                                       struct bkey_i_extent *e,
1533                                       unsigned nr_replicas,
1534                                       enum alloc_reserve reserve,
1535                                       struct closure *cl)
1536 {
1537         struct open_bucket *ob;
1538
1539         ob = bch_alloc_sectors_start(c, wp, nr_replicas, reserve, cl);
1540         if (IS_ERR_OR_NULL(ob))
1541                 return ob;
1542
1543         if (e->k.size > ob->sectors_free)
1544                 bch_key_resize(&e->k, ob->sectors_free);
1545
1546         bch_alloc_sectors_append_ptrs(c, e, nr_replicas, ob, e->k.size);
1547
1548         bch_alloc_sectors_done(c, wp, ob);
1549
1550         return ob;
1551 }
1552
1553 /* Startup/shutdown (ro/rw): */
1554
1555 static void bch_recalc_capacity(struct cache_set *c)
1556 {
1557         struct cache_group *tier = c->cache_tiers + ARRAY_SIZE(c->cache_tiers);
1558         struct cache *ca;
1559         u64 total_capacity, capacity = 0, reserved_sectors = 0;
1560         unsigned long ra_pages = 0;
1561         unsigned i, j;
1562
1563         rcu_read_lock();
1564         for_each_cache_rcu(ca, c, i) {
1565                 struct backing_dev_info *bdi =
1566                         blk_get_backing_dev_info(ca->disk_sb.bdev);
1567
1568                 ra_pages += bdi->ra_pages;
1569         }
1570
1571         c->bdi.ra_pages = ra_pages;
1572
1573         /*
1574          * Capacity of the cache set is the capacity of all the devices in the
1575          * slowest (highest) tier - we don't include lower tier devices.
1576          */
1577         for (tier = c->cache_tiers + ARRAY_SIZE(c->cache_tiers) - 1;
1578              tier > c->cache_tiers && !tier->nr_devices;
1579              --tier)
1580                 ;
1581
1582         group_for_each_cache_rcu(ca, tier, i) {
1583                 size_t reserve = 0;
1584
1585                 /*
1586                  * We need to reserve buckets (from the number
1587                  * of currently available buckets) against
1588                  * foreground writes so that mainly copygc can
1589                  * make forward progress.
1590                  *
1591                  * We need enough to refill the various reserves
1592                  * from scratch - copygc will use its entire
1593                  * reserve all at once, then run against when
1594                  * its reserve is refilled (from the formerly
1595                  * available buckets).
1596                  *
1597                  * This reserve is just used when considering if
1598                  * allocations for foreground writes must wait -
1599                  * not -ENOSPC calculations.
1600                  */
1601                 for (j = 0; j < RESERVE_NONE; j++)
1602                         reserve += ca->free[j].size;
1603
1604                 reserve += ca->free_inc.size;
1605
1606                 reserve += ARRAY_SIZE(c->write_points);
1607
1608                 if (ca->mi.tier)
1609                         reserve += 1;   /* tiering write point */
1610                 reserve += 1;           /* btree write point */
1611
1612                 reserved_sectors += reserve << ca->bucket_bits;
1613
1614                 capacity += (ca->mi.nbuckets -
1615                              ca->mi.first_bucket) <<
1616                         ca->bucket_bits;
1617         }
1618         rcu_read_unlock();
1619
1620         total_capacity = capacity;
1621
1622         capacity *= (100 - c->opts.gc_reserve_percent);
1623         capacity = div64_u64(capacity, 100);
1624
1625         BUG_ON(capacity + reserved_sectors > total_capacity);
1626
1627         c->capacity = capacity;
1628
1629         if (c->capacity) {
1630                 bch_io_timer_add(&c->io_clock[READ],
1631                                  &c->prio_clock[READ].rescale);
1632                 bch_io_timer_add(&c->io_clock[WRITE],
1633                                  &c->prio_clock[WRITE].rescale);
1634         } else {
1635                 bch_io_timer_del(&c->io_clock[READ],
1636                                  &c->prio_clock[READ].rescale);
1637                 bch_io_timer_del(&c->io_clock[WRITE],
1638                                  &c->prio_clock[WRITE].rescale);
1639         }
1640
1641         /* Wake up case someone was waiting for buckets */
1642         closure_wake_up(&c->freelist_wait);
1643 }
1644
1645 static void bch_stop_write_point(struct cache *ca,
1646                                  struct write_point *wp)
1647 {
1648         struct cache_set *c = ca->set;
1649         struct open_bucket *ob;
1650         struct bch_extent_ptr *ptr;
1651
1652         ob = lock_writepoint(c, wp);
1653         if (!ob)
1654                 return;
1655
1656         for (ptr = ob->ptrs; ptr < ob->ptrs + ob->nr_ptrs; ptr++)
1657                 if (ptr->dev == ca->sb.nr_this_dev)
1658                         goto found;
1659
1660         mutex_unlock(&ob->lock);
1661         return;
1662 found:
1663         BUG_ON(xchg(&wp->b, NULL) != ob);
1664         mutex_unlock(&ob->lock);
1665
1666         /* Drop writepoint's ref: */
1667         bch_open_bucket_put(c, ob);
1668 }
1669
1670 static bool bch_dev_has_open_write_point(struct cache *ca)
1671 {
1672         struct cache_set *c = ca->set;
1673         struct bch_extent_ptr *ptr;
1674         struct open_bucket *ob;
1675
1676         for (ob = c->open_buckets;
1677              ob < c->open_buckets + ARRAY_SIZE(c->open_buckets);
1678              ob++)
1679                 if (atomic_read(&ob->pin)) {
1680                         mutex_lock(&ob->lock);
1681                         for (ptr = ob->ptrs; ptr < ob->ptrs + ob->nr_ptrs; ptr++)
1682                                 if (ptr->dev == ca->sb.nr_this_dev) {
1683                                         mutex_unlock(&ob->lock);
1684                                         return true;
1685                                 }
1686                         mutex_unlock(&ob->lock);
1687                 }
1688
1689         return false;
1690 }
1691
1692 /* device goes ro: */
1693 void bch_cache_allocator_stop(struct cache *ca)
1694 {
1695         struct cache_set *c = ca->set;
1696         struct cache_group *tier = &c->cache_tiers[ca->mi.tier];
1697         struct task_struct *p;
1698         struct closure cl;
1699         unsigned i;
1700
1701         closure_init_stack(&cl);
1702
1703         /* First, remove device from allocation groups: */
1704
1705         bch_cache_group_remove_cache(tier, ca);
1706         bch_cache_group_remove_cache(&c->cache_all, ca);
1707
1708         bch_recalc_capacity(c);
1709
1710         /*
1711          * Stopping the allocator thread comes after removing from allocation
1712          * groups, else pending allocations will hang:
1713          */
1714
1715         p = ca->alloc_thread;
1716         ca->alloc_thread = NULL;
1717         smp_wmb();
1718
1719         /*
1720          * We need an rcu barrier between setting ca->alloc_thread = NULL and
1721          * the thread shutting down to avoid a race with bucket_stats_update() -
1722          * the allocator thread itself does a synchronize_rcu() on exit.
1723          *
1724          * XXX: it would be better to have the rcu barrier be asynchronous
1725          * instead of blocking us here
1726          */
1727         if (p) {
1728                 kthread_stop(p);
1729                 put_task_struct(p);
1730         }
1731
1732         /* Next, close write points that point to this device... */
1733
1734         for (i = 0; i < ARRAY_SIZE(c->write_points); i++)
1735                 bch_stop_write_point(ca, &c->write_points[i]);
1736
1737         bch_stop_write_point(ca, &ca->copygc_write_point);
1738         bch_stop_write_point(ca, &c->promote_write_point);
1739         bch_stop_write_point(ca, &ca->tiering_write_point);
1740         bch_stop_write_point(ca, &c->migration_write_point);
1741         bch_stop_write_point(ca, &c->btree_write_point);
1742
1743         mutex_lock(&c->btree_reserve_cache_lock);
1744         while (c->btree_reserve_cache_nr) {
1745                 struct btree_alloc *a =
1746                         &c->btree_reserve_cache[--c->btree_reserve_cache_nr];
1747
1748                 bch_open_bucket_put(c, a->ob);
1749         }
1750         mutex_unlock(&c->btree_reserve_cache_lock);
1751
1752         /* Avoid deadlocks.. */
1753
1754         closure_wake_up(&c->freelist_wait);
1755         wake_up(&c->journal.wait);
1756
1757         /* Now wait for any in flight writes: */
1758
1759         while (1) {
1760                 closure_wait(&c->open_buckets_wait, &cl);
1761
1762                 if (!bch_dev_has_open_write_point(ca)) {
1763                         closure_wake_up(&c->open_buckets_wait);
1764                         break;
1765                 }
1766
1767                 closure_sync(&cl);
1768         }
1769 }
1770
1771 /*
1772  * Startup the allocator thread for transition to RW mode:
1773  */
1774 int bch_cache_allocator_start(struct cache *ca)
1775 {
1776         struct cache_set *c = ca->set;
1777         struct cache_group *tier = &c->cache_tiers[ca->mi.tier];
1778         struct task_struct *k;
1779
1780         /*
1781          * allocator thread already started?
1782          */
1783         if (ca->alloc_thread)
1784                 return 0;
1785
1786         k = kthread_create(bch_allocator_thread, ca, "bcache_allocator");
1787         if (IS_ERR(k))
1788                 return 0;
1789
1790         get_task_struct(k);
1791         ca->alloc_thread = k;
1792
1793         bch_cache_group_add_cache(tier, ca);
1794         bch_cache_group_add_cache(&c->cache_all, ca);
1795
1796         bch_recalc_capacity(c);
1797
1798         /*
1799          * Don't wake up allocator thread until after adding device to
1800          * allocator groups - otherwise, alloc thread could get a spurious
1801          * -EROFS due to prio_write() -> journal_meta() not finding any devices:
1802          */
1803         wake_up_process(k);
1804         return 0;
1805 }
1806
1807 void bch_open_buckets_init(struct cache_set *c)
1808 {
1809         unsigned i;
1810
1811         INIT_LIST_HEAD(&c->open_buckets_open);
1812         INIT_LIST_HEAD(&c->open_buckets_free);
1813         spin_lock_init(&c->open_buckets_lock);
1814         bch_prio_timer_init(c, READ);
1815         bch_prio_timer_init(c, WRITE);
1816
1817         /* open bucket 0 is a sentinal NULL: */
1818         mutex_init(&c->open_buckets[0].lock);
1819         INIT_LIST_HEAD(&c->open_buckets[0].list);
1820
1821         for (i = 1; i < ARRAY_SIZE(c->open_buckets); i++) {
1822                 mutex_init(&c->open_buckets[i].lock);
1823                 c->open_buckets_nr_free++;
1824                 list_add(&c->open_buckets[i].list, &c->open_buckets_free);
1825         }
1826
1827         spin_lock_init(&c->cache_all.lock);
1828
1829         for (i = 0; i < ARRAY_SIZE(c->write_points); i++) {
1830                 c->write_points[i].throttle = true;
1831                 c->write_points[i].group = &c->cache_tiers[0];
1832         }
1833
1834         for (i = 0; i < ARRAY_SIZE(c->cache_tiers); i++)
1835                 spin_lock_init(&c->cache_tiers[i].lock);
1836
1837         c->promote_write_point.group = &c->cache_tiers[0];
1838
1839         c->migration_write_point.group = &c->cache_all;
1840
1841         c->btree_write_point.group = &c->cache_all;
1842
1843         c->pd_controllers_update_seconds = 5;
1844         INIT_DELAYED_WORK(&c->pd_controllers_update, pd_controllers_update);
1845
1846         spin_lock_init(&c->foreground_write_pd_lock);
1847         bch_pd_controller_init(&c->foreground_write_pd);
1848         /*
1849          * We do not want the write rate to have an effect on the computed
1850          * rate, for two reasons:
1851          *
1852          * We do not call bch_ratelimit_delay() at all if the write rate
1853          * exceeds 1GB/s. In this case, the PD controller will think we are
1854          * not "keeping up" and not change the rate.
1855          */
1856         c->foreground_write_pd.backpressure = 0;
1857         init_timer(&c->foreground_write_wakeup);
1858
1859         c->foreground_write_wakeup.data = (unsigned long) c;
1860         c->foreground_write_wakeup.function = bch_wake_delayed_writes;
1861 }