]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcache/io.c
Delete more unused shim code, update bcache code
[bcachefs-tools-debian] / libbcache / io.c
1 /*
2  * Some low level IO code, and hacks for various block layer limitations
3  *
4  * Copyright 2010, 2011 Kent Overstreet <kent.overstreet@gmail.com>
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcache.h"
9 #include "alloc.h"
10 #include "bset.h"
11 #include "btree_update.h"
12 #include "buckets.h"
13 #include "checksum.h"
14 #include "compress.h"
15 #include "clock.h"
16 #include "debug.h"
17 #include "error.h"
18 #include "extents.h"
19 #include "io.h"
20 #include "journal.h"
21 #include "keylist.h"
22 #include "move.h"
23 #include "notify.h"
24 #include "stats.h"
25 #include "super-io.h"
26
27 #include <linux/blkdev.h>
28 #include <linux/random.h>
29
30 #include <trace/events/bcache.h>
31
32 static inline void __bio_inc_remaining(struct bio *bio)
33 {
34         bio_set_flag(bio, BIO_CHAIN);
35         smp_mb__before_atomic();
36         atomic_inc(&bio->__bi_remaining);
37 }
38
39 void bch_generic_make_request(struct bio *bio, struct cache_set *c)
40 {
41         if (current->bio_list) {
42                 spin_lock(&c->bio_submit_lock);
43                 bio_list_add(&c->bio_submit_list, bio);
44                 spin_unlock(&c->bio_submit_lock);
45                 queue_work(bcache_io_wq, &c->bio_submit_work);
46         } else {
47                 generic_make_request(bio);
48         }
49 }
50
51 void bch_bio_submit_work(struct work_struct *work)
52 {
53         struct cache_set *c = container_of(work, struct cache_set,
54                                            bio_submit_work);
55         struct bio_list bl;
56         struct bio *bio;
57
58         spin_lock(&c->bio_submit_lock);
59         bl = c->bio_submit_list;
60         bio_list_init(&c->bio_submit_list);
61         spin_unlock(&c->bio_submit_lock);
62
63         while ((bio = bio_list_pop(&bl)))
64                 generic_make_request(bio);
65 }
66
67 /* Allocate, free from mempool: */
68
69 void bch_bio_free_pages_pool(struct cache_set *c, struct bio *bio)
70 {
71         struct bio_vec *bv;
72         unsigned i;
73
74         bio_for_each_segment_all(bv, bio, i)
75                 if (bv->bv_page != ZERO_PAGE(0))
76                         mempool_free(bv->bv_page, &c->bio_bounce_pages);
77         bio->bi_vcnt = 0;
78 }
79
80 static void bch_bio_alloc_page_pool(struct cache_set *c, struct bio *bio,
81                                     bool *using_mempool)
82 {
83         struct bio_vec *bv = &bio->bi_io_vec[bio->bi_vcnt++];
84
85         if (likely(!*using_mempool)) {
86                 bv->bv_page = alloc_page(GFP_NOIO);
87                 if (unlikely(!bv->bv_page)) {
88                         mutex_lock(&c->bio_bounce_pages_lock);
89                         *using_mempool = true;
90                         goto pool_alloc;
91
92                 }
93         } else {
94 pool_alloc:
95                 bv->bv_page = mempool_alloc(&c->bio_bounce_pages, GFP_NOIO);
96         }
97
98         bv->bv_len = PAGE_SIZE;
99         bv->bv_offset = 0;
100 }
101
102 void bch_bio_alloc_pages_pool(struct cache_set *c, struct bio *bio,
103                               size_t bytes)
104 {
105         bool using_mempool = false;
106
107         bio->bi_iter.bi_size = bytes;
108
109         while (bio->bi_vcnt < DIV_ROUND_UP(bytes, PAGE_SIZE))
110                 bch_bio_alloc_page_pool(c, bio, &using_mempool);
111
112         if (using_mempool)
113                 mutex_unlock(&c->bio_bounce_pages_lock);
114 }
115
116 /* Bios with headers */
117
118 static void bch_submit_wbio(struct cache_set *c, struct bch_write_bio *wbio,
119                             struct cache *ca, const struct bch_extent_ptr *ptr,
120                             bool punt)
121 {
122         wbio->ca                = ca;
123         wbio->submit_time_us    = local_clock_us();
124         wbio->bio.bi_iter.bi_sector = ptr->offset;
125         wbio->bio.bi_bdev       = ca ? ca->disk_sb.bdev : NULL;
126
127         if (!ca)
128                 bcache_io_error(c, &wbio->bio, "device has been removed");
129         else if (punt)
130                 bch_generic_make_request(&wbio->bio, c);
131         else
132                 generic_make_request(&wbio->bio);
133 }
134
135 void bch_submit_wbio_replicas(struct bch_write_bio *wbio, struct cache_set *c,
136                               const struct bkey_i *k, bool punt)
137 {
138         struct bkey_s_c_extent e = bkey_i_to_s_c_extent(k);
139         const struct bch_extent_ptr *ptr;
140         struct bch_write_bio *n;
141         struct cache *ca;
142
143         BUG_ON(c->opts.nochanges);
144
145         wbio->split = false;
146         wbio->c = c;
147
148         extent_for_each_ptr(e, ptr) {
149                 rcu_read_lock();
150                 ca = PTR_CACHE(c, ptr);
151                 if (ca)
152                         percpu_ref_get(&ca->ref);
153                 rcu_read_unlock();
154
155                 if (!ca) {
156                         bch_submit_wbio(c, wbio, ca, ptr, punt);
157                         break;
158                 }
159
160                 if (ptr + 1 < &extent_entry_last(e)->ptr) {
161                         n = to_wbio(bio_clone_fast(&wbio->bio, GFP_NOIO,
162                                                    &ca->replica_set));
163
164                         n->bio.bi_end_io        = wbio->bio.bi_end_io;
165                         n->bio.bi_private       = wbio->bio.bi_private;
166                         n->c                    = c;
167                         n->orig                 = &wbio->bio;
168                         n->bounce               = false;
169                         n->split                = true;
170                         n->put_bio              = true;
171                         n->bio.bi_opf           = wbio->bio.bi_opf;
172                         __bio_inc_remaining(n->orig);
173                 } else {
174                         n = wbio;
175                 }
176
177                 if (!journal_flushes_device(ca))
178                         n->bio.bi_opf |= REQ_FUA;
179
180                 bch_submit_wbio(c, n, ca, ptr, punt);
181         }
182 }
183
184 /* IO errors */
185
186 /* Writes */
187
188 static struct workqueue_struct *index_update_wq(struct bch_write_op *op)
189 {
190         return op->alloc_reserve == RESERVE_MOVINGGC
191                 ? op->c->copygc_wq
192                 : op->c->wq;
193 }
194
195 static void __bch_write(struct closure *);
196
197 static void bch_write_done(struct closure *cl)
198 {
199         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
200
201         BUG_ON(!(op->flags & BCH_WRITE_DONE));
202
203         if (!op->error && (op->flags & BCH_WRITE_FLUSH))
204                 op->error = bch_journal_error(&op->c->journal);
205
206         bch_disk_reservation_put(op->c, &op->res);
207         percpu_ref_put(&op->c->writes);
208         bch_keylist_free(&op->insert_keys, op->inline_keys);
209         closure_return(cl);
210 }
211
212 static u64 keylist_sectors(struct keylist *keys)
213 {
214         struct bkey_i *k;
215         u64 ret = 0;
216
217         for_each_keylist_key(keys, k)
218                 ret += k->k.size;
219
220         return ret;
221 }
222
223 static int bch_write_index_default(struct bch_write_op *op)
224 {
225         struct keylist *keys = &op->insert_keys;
226         struct btree_iter iter;
227         int ret;
228
229         bch_btree_iter_init_intent(&iter, op->c, BTREE_ID_EXTENTS,
230                 bkey_start_pos(&bch_keylist_front(keys)->k));
231
232         ret = bch_btree_insert_list_at(&iter, keys, &op->res,
233                                        NULL, op_journal_seq(op),
234                                        BTREE_INSERT_NOFAIL);
235         bch_btree_iter_unlock(&iter);
236
237         return ret;
238 }
239
240 /**
241  * bch_write_index - after a write, update index to point to new data
242  */
243 static void bch_write_index(struct closure *cl)
244 {
245         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
246         struct cache_set *c = op->c;
247         struct keylist *keys = &op->insert_keys;
248         unsigned i;
249
250         op->flags |= BCH_WRITE_LOOPED;
251
252         if (!bch_keylist_empty(keys)) {
253                 u64 sectors_start = keylist_sectors(keys);
254                 int ret = op->index_update_fn(op);
255
256                 BUG_ON(keylist_sectors(keys) && !ret);
257
258                 op->written += sectors_start - keylist_sectors(keys);
259
260                 if (ret) {
261                         __bcache_io_error(c, "btree IO error %i", ret);
262                         op->error = ret;
263                 }
264         }
265
266         for (i = 0; i < ARRAY_SIZE(op->open_buckets); i++)
267                 if (op->open_buckets[i]) {
268                         bch_open_bucket_put(c,
269                                             c->open_buckets +
270                                             op->open_buckets[i]);
271                         op->open_buckets[i] = 0;
272                 }
273
274         if (!(op->flags & BCH_WRITE_DONE))
275                 continue_at(cl, __bch_write, op->io_wq);
276
277         if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
278                 bch_journal_flush_seq_async(&c->journal,
279                                             *op_journal_seq(op),
280                                             cl);
281                 continue_at(cl, bch_write_done, index_update_wq(op));
282         } else {
283                 continue_at_nobarrier(cl, bch_write_done, NULL);
284         }
285 }
286
287 /**
288  * bch_write_discard - discard range of keys
289  *
290  * Used to implement discard, and to handle when writethrough write hits
291  * a write error on the cache device.
292  */
293 static void bch_write_discard(struct closure *cl)
294 {
295         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
296         struct bio *bio = &op->bio->bio;
297         struct bpos end = op->pos;
298
299         end.offset += bio_sectors(bio);
300
301         op->error = bch_discard(op->c, op->pos, end, op->version,
302                                 &op->res, NULL, NULL);
303 }
304
305 /*
306  * Convert extents to be inserted to discards after an error:
307  */
308 static void bch_write_io_error(struct closure *cl)
309 {
310         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
311
312         if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
313                 struct bkey_i *src = bch_keylist_front(&op->insert_keys);
314                 struct bkey_i *dst = bch_keylist_front(&op->insert_keys);
315
316                 /*
317                  * Our data write just errored, which means we've got a bunch
318                  * of keys to insert that point to data that wasn't
319                  * successfully written.
320                  *
321                  * We don't have to insert those keys but we still have to
322                  * invalidate that region of the cache - so, if we just strip
323                  * off all the pointers from the keys we'll accomplish just
324                  * that.
325                  */
326
327                 while (src != op->insert_keys.top) {
328                         struct bkey_i *n = bkey_next(src);
329
330                         set_bkey_val_u64s(&src->k, 0);
331                         src->k.type = KEY_TYPE_DISCARD;
332                         bkey_copy(dst, src);
333
334                         dst = bkey_next(dst);
335                         src = n;
336                 }
337
338                 op->insert_keys.top = dst;
339                 op->flags |= BCH_WRITE_DISCARD;
340         } else {
341                 /* TODO: We could try to recover from this. */
342                 while (!bch_keylist_empty(&op->insert_keys))
343                         bch_keylist_pop_front(&op->insert_keys);
344
345                 op->error = -EIO;
346                 op->flags |= BCH_WRITE_DONE;
347         }
348
349         bch_write_index(cl);
350 }
351
352 static void bch_write_endio(struct bio *bio)
353 {
354         struct closure *cl = bio->bi_private;
355         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
356         struct bch_write_bio *wbio = to_wbio(bio);
357         struct cache_set *c = wbio->c;
358         struct bio *orig = wbio->orig;
359         struct cache *ca = wbio->ca;
360
361         if (bch_dev_nonfatal_io_err_on(bio->bi_error, ca,
362                                        "data write"))
363                 set_closure_fn(cl, bch_write_io_error, index_update_wq(op));
364
365         bch_account_io_completion_time(ca, wbio->submit_time_us,
366                                        REQ_OP_WRITE);
367         if (ca)
368                 percpu_ref_put(&ca->ref);
369
370         if (bio->bi_error && orig)
371                 orig->bi_error = bio->bi_error;
372
373         if (wbio->bounce)
374                 bch_bio_free_pages_pool(c, bio);
375
376         if (wbio->put_bio)
377                 bio_put(bio);
378
379         if (orig)
380                 bio_endio(orig);
381         else
382                 closure_put(cl);
383 }
384
385 static struct nonce extent_nonce(struct bversion version,
386                                  unsigned nonce,
387                                  unsigned uncompressed_size,
388                                  unsigned compression_type)
389 {
390         return (struct nonce) {{
391                 [0] = cpu_to_le32((nonce                << 12) |
392                                   (uncompressed_size    << 22)),
393                 [1] = cpu_to_le32(version.lo),
394                 [2] = cpu_to_le32(version.lo >> 32),
395                 [3] = cpu_to_le32(version.hi|
396                                   (compression_type << 24))^BCH_NONCE_EXTENT,
397         }};
398 }
399
400 static void init_append_extent(struct bch_write_op *op,
401                                unsigned compressed_size,
402                                unsigned uncompressed_size,
403                                unsigned compression_type,
404                                unsigned nonce,
405                                struct bch_csum csum, unsigned csum_type,
406                                struct open_bucket *ob)
407 {
408         struct bkey_i_extent *e = bkey_extent_init(op->insert_keys.top);
409
410         op->pos.offset += uncompressed_size;
411         e->k.p = op->pos;
412         e->k.size = uncompressed_size;
413         e->k.version = op->version;
414         bkey_extent_set_cached(&e->k, op->flags & BCH_WRITE_CACHED);
415
416         bch_extent_crc_append(e, compressed_size,
417                               uncompressed_size,
418                               compression_type,
419                               nonce, csum, csum_type);
420
421         bch_alloc_sectors_append_ptrs(op->c, e, op->nr_replicas,
422                                       ob, compressed_size);
423
424         bkey_extent_set_cached(&e->k, (op->flags & BCH_WRITE_CACHED));
425         bch_keylist_push(&op->insert_keys);
426 }
427
428 static int bch_write_extent(struct bch_write_op *op,
429                             struct open_bucket *ob,
430                             struct bio *orig)
431 {
432         struct cache_set *c = op->c;
433         struct bio *bio;
434         struct bch_write_bio *wbio;
435         unsigned key_to_write_offset = op->insert_keys.top_p -
436                 op->insert_keys.keys_p;
437         struct bkey_i *key_to_write;
438         unsigned csum_type = op->csum_type;
439         unsigned compression_type = op->compression_type;
440         int ret;
441
442         /* don't refetch csum type/compression type */
443         barrier();
444
445         /* Need to decompress data? */
446         if ((op->flags & BCH_WRITE_DATA_COMPRESSED) &&
447             (crc_uncompressed_size(NULL, &op->crc) != op->size ||
448              crc_compressed_size(NULL, &op->crc) > ob->sectors_free)) {
449                 int ret;
450
451                 ret = bch_bio_uncompress_inplace(c, orig, op->size, op->crc);
452                 if (ret)
453                         return ret;
454
455                 op->flags &= ~BCH_WRITE_DATA_COMPRESSED;
456         }
457
458         if (op->flags & BCH_WRITE_DATA_COMPRESSED) {
459                 init_append_extent(op,
460                                    crc_compressed_size(NULL, &op->crc),
461                                    crc_uncompressed_size(NULL, &op->crc),
462                                    op->crc.compression_type,
463                                    op->crc.nonce,
464                                    op->crc.csum,
465                                    op->crc.csum_type,
466                                    ob);
467
468                 bio                     = orig;
469                 wbio                    = to_wbio(bio);
470                 wbio->orig              = NULL;
471                 wbio->bounce            = false;
472                 wbio->put_bio           = false;
473                 ret                     = 0;
474         } else if (csum_type != BCH_CSUM_NONE ||
475                    compression_type != BCH_COMPRESSION_NONE) {
476                 /* all units here in bytes */
477                 unsigned total_output = 0, output_available =
478                         min(ob->sectors_free << 9, orig->bi_iter.bi_size);
479                 unsigned crc_nonce = bch_csum_type_is_encryption(csum_type)
480                         ? op->nonce : 0;
481                 struct bch_csum csum;
482                 struct nonce nonce;
483
484                 bio = bio_alloc_bioset(GFP_NOIO,
485                                        DIV_ROUND_UP(output_available, PAGE_SIZE),
486                                        &c->bio_write);
487                 /*
488                  * XXX: can't use mempool for more than
489                  * BCH_COMPRESSED_EXTENT_MAX worth of pages
490                  */
491                 bch_bio_alloc_pages_pool(c, bio, output_available);
492
493                 /* copy WRITE_SYNC flag */
494                 bio->bi_opf             = orig->bi_opf;
495                 wbio                    = to_wbio(bio);
496                 wbio->orig              = NULL;
497                 wbio->bounce            = true;
498                 wbio->put_bio           = true;
499
500                 do {
501                         unsigned fragment_compression_type = compression_type;
502                         size_t dst_len, src_len;
503
504                         bch_bio_compress(c, bio, &dst_len,
505                                          orig, &src_len,
506                                          &fragment_compression_type);
507
508                         BUG_ON(!dst_len || dst_len > bio->bi_iter.bi_size);
509                         BUG_ON(!src_len || src_len > orig->bi_iter.bi_size);
510                         BUG_ON(dst_len & (block_bytes(c) - 1));
511                         BUG_ON(src_len & (block_bytes(c) - 1));
512
513                         swap(bio->bi_iter.bi_size, dst_len);
514                         nonce = extent_nonce(op->version,
515                                              crc_nonce,
516                                              src_len >> 9,
517                                              compression_type),
518
519                         bch_encrypt_bio(c, csum_type, nonce, bio);
520
521                         csum = bch_checksum_bio(c, csum_type, nonce, bio);
522                         swap(bio->bi_iter.bi_size, dst_len);
523
524                         init_append_extent(op,
525                                            dst_len >> 9, src_len >> 9,
526                                            fragment_compression_type,
527                                            crc_nonce, csum, csum_type, ob);
528
529                         total_output += dst_len;
530                         bio_advance(bio, dst_len);
531                         bio_advance(orig, src_len);
532                 } while (bio->bi_iter.bi_size &&
533                          orig->bi_iter.bi_size &&
534                          !bch_keylist_realloc(&op->insert_keys,
535                                               op->inline_keys,
536                                               ARRAY_SIZE(op->inline_keys),
537                                               BKEY_EXTENT_U64s_MAX));
538
539                 BUG_ON(total_output > output_available);
540
541                 memset(&bio->bi_iter, 0, sizeof(bio->bi_iter));
542                 bio->bi_iter.bi_size = total_output;
543
544                 /*
545                  * Free unneeded pages after compressing:
546                  */
547                 while (bio->bi_vcnt * PAGE_SIZE >
548                        round_up(bio->bi_iter.bi_size, PAGE_SIZE))
549                         mempool_free(bio->bi_io_vec[--bio->bi_vcnt].bv_page,
550                                      &c->bio_bounce_pages);
551
552                 ret = orig->bi_iter.bi_size != 0;
553         } else {
554                 bio = bio_next_split(orig, ob->sectors_free, GFP_NOIO,
555                                      &c->bio_write);
556
557                 wbio                    = to_wbio(bio);
558                 wbio->orig              = NULL;
559                 wbio->bounce            = false;
560                 wbio->put_bio           = bio != orig;
561
562                 init_append_extent(op, bio_sectors(bio), bio_sectors(bio),
563                                    compression_type, 0,
564                                    (struct bch_csum) { 0 }, csum_type, ob);
565
566                 ret = bio != orig;
567         }
568
569         bio->bi_end_io  = bch_write_endio;
570         bio->bi_private = &op->cl;
571         bio_set_op_attrs(bio, REQ_OP_WRITE, 0);
572
573         closure_get(bio->bi_private);
574
575         /* might have done a realloc... */
576
577         key_to_write = (void *) (op->insert_keys.keys_p + key_to_write_offset);
578
579         bch_check_mark_super(c, key_to_write, false);
580
581 #ifndef CONFIG_BCACHE_NO_IO
582         bch_submit_wbio_replicas(to_wbio(bio), c, key_to_write, false);
583 #else
584         to_wbio(bio)->ca = NULL;
585         bio_endio(bio);
586 #endif
587         return ret;
588 }
589
590 static void __bch_write(struct closure *cl)
591 {
592         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
593         struct cache_set *c = op->c;
594         struct bio *bio = &op->bio->bio;
595         unsigned open_bucket_nr = 0;
596         struct open_bucket *b;
597         int ret;
598
599         memset(op->open_buckets, 0, sizeof(op->open_buckets));
600
601         if (op->flags & BCH_WRITE_DISCARD) {
602                 op->flags |= BCH_WRITE_DONE;
603                 bch_write_discard(cl);
604                 bio_put(bio);
605                 continue_at(cl, bch_write_done, index_update_wq(op));
606         }
607
608         /*
609          * Journal writes are marked REQ_PREFLUSH; if the original write was a
610          * flush, it'll wait on the journal write.
611          */
612         bio->bi_opf &= ~(REQ_PREFLUSH|REQ_FUA);
613
614         do {
615                 EBUG_ON(bio->bi_iter.bi_sector != op->pos.offset);
616                 EBUG_ON(!bio_sectors(bio));
617
618                 if (open_bucket_nr == ARRAY_SIZE(op->open_buckets))
619                         continue_at(cl, bch_write_index, index_update_wq(op));
620
621                 /* for the device pointers and 1 for the chksum */
622                 if (bch_keylist_realloc(&op->insert_keys,
623                                         op->inline_keys,
624                                         ARRAY_SIZE(op->inline_keys),
625                                         BKEY_EXTENT_U64s_MAX))
626                         continue_at(cl, bch_write_index, index_update_wq(op));
627
628                 b = bch_alloc_sectors_start(c, op->wp, op->nr_replicas,
629                         op->alloc_reserve,
630                         (op->flags & BCH_WRITE_ALLOC_NOWAIT) ? NULL : cl);
631                 EBUG_ON(!b);
632
633                 if (unlikely(IS_ERR(b))) {
634                         if (unlikely(PTR_ERR(b) != -EAGAIN)) {
635                                 ret = PTR_ERR(b);
636                                 goto err;
637                         }
638
639                         /*
640                          * If we already have some keys, must insert them first
641                          * before allocating another open bucket. We only hit
642                          * this case if open_bucket_nr > 1.
643                          */
644                         if (!bch_keylist_empty(&op->insert_keys))
645                                 continue_at(cl, bch_write_index,
646                                             index_update_wq(op));
647
648                         /*
649                          * If we've looped, we're running out of a workqueue -
650                          * not the bch_write() caller's context - and we don't
651                          * want to block the workqueue:
652                          */
653                         if (op->flags & BCH_WRITE_LOOPED)
654                                 continue_at(cl, __bch_write, op->io_wq);
655
656                         /*
657                          * Otherwise, we do want to block the caller on alloc
658                          * failure instead of letting it queue up more and more
659                          * writes:
660                          * XXX: this technically needs a try_to_freeze() -
661                          * except that that's not safe because caller may have
662                          * issued other IO... hmm..
663                          */
664                         closure_sync(cl);
665                         continue;
666                 }
667
668                 BUG_ON(b - c->open_buckets == 0 ||
669                        b - c->open_buckets > U8_MAX);
670                 op->open_buckets[open_bucket_nr++] = b - c->open_buckets;
671
672                 ret = bch_write_extent(op, b, bio);
673
674                 bch_alloc_sectors_done(c, op->wp, b);
675
676                 if (ret < 0)
677                         goto err;
678         } while (ret);
679
680         op->flags |= BCH_WRITE_DONE;
681         continue_at(cl, bch_write_index, index_update_wq(op));
682 err:
683         if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
684                 /*
685                  * If we were writing cached data, not doing the write is fine
686                  * so long as we discard whatever would have been overwritten -
687                  * then it's equivalent to doing the write and immediately
688                  * reclaiming it.
689                  */
690
691                 bch_write_discard(cl);
692         } else {
693                 /*
694                  * Right now we can only error here if we went RO - the
695                  * allocation failed, but we already checked for -ENOSPC when we
696                  * got our reservation.
697                  *
698                  * XXX capacity might have changed, but we don't check for that
699                  * yet:
700                  */
701                 op->error = ret;
702         }
703
704         op->flags |= BCH_WRITE_DONE;
705
706         /*
707          * No reason not to insert keys for whatever data was successfully
708          * written (especially for a cmpxchg operation that's moving data
709          * around)
710          */
711         continue_at(cl, !bch_keylist_empty(&op->insert_keys)
712                     ? bch_write_index
713                     : bch_write_done, index_update_wq(op));
714 }
715
716 void bch_wake_delayed_writes(unsigned long data)
717 {
718         struct cache_set *c = (void *) data;
719         struct bch_write_op *op;
720         unsigned long flags;
721
722         spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
723
724         while ((op = c->write_wait_head)) {
725                 if (!test_bit(BCH_FS_RO, &c->flags) &&
726                     !test_bit(BCH_FS_STOPPING, &c->flags) &&
727                     time_after(op->expires, jiffies)) {
728                         mod_timer(&c->foreground_write_wakeup, op->expires);
729                         break;
730                 }
731
732                 c->write_wait_head = op->next;
733                 if (!c->write_wait_head)
734                         c->write_wait_tail = NULL;
735
736                 closure_put(&op->cl);
737         }
738
739         spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
740 }
741
742 /**
743  * bch_write - handle a write to a cache device or flash only volume
744  *
745  * This is the starting point for any data to end up in a cache device; it could
746  * be from a normal write, or a writeback write, or a write to a flash only
747  * volume - it's also used by the moving garbage collector to compact data in
748  * mostly empty buckets.
749  *
750  * It first writes the data to the cache, creating a list of keys to be inserted
751  * (if the data won't fit in a single open bucket, there will be multiple keys);
752  * after the data is written it calls bch_journal, and after the keys have been
753  * added to the next journal write they're inserted into the btree.
754  *
755  * It inserts the data in op->bio; bi_sector is used for the key offset, and
756  * op->inode is used for the key inode.
757  *
758  * If op->discard is true, instead of inserting the data it invalidates the
759  * region of the cache represented by op->bio and op->inode.
760  */
761 void bch_write(struct closure *cl)
762 {
763         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
764         struct bio *bio = &op->bio->bio;
765         struct cache_set *c = op->c;
766         u64 inode = op->pos.inode;
767
768         trace_bcache_write(c, inode, bio,
769                            !(op->flags & BCH_WRITE_CACHED),
770                            op->flags & BCH_WRITE_DISCARD);
771
772         if (c->opts.nochanges ||
773             !percpu_ref_tryget(&c->writes)) {
774                 __bcache_io_error(c, "read only");
775                 op->error = -EROFS;
776                 bch_disk_reservation_put(c, &op->res);
777                 closure_return(cl);
778         }
779
780         if (bversion_zero(op->version) &&
781             bch_csum_type_is_encryption(op->csum_type))
782                 op->version.lo =
783                         atomic64_inc_return(&c->key_version) + 1;
784
785         if (!(op->flags & BCH_WRITE_DISCARD))
786                 bch_increment_clock(c, bio_sectors(bio), WRITE);
787
788         if (!(op->flags & BCH_WRITE_DISCARD))
789                 bch_mark_foreground_write(c, bio_sectors(bio));
790         else
791                 bch_mark_discard(c, bio_sectors(bio));
792
793         /* Don't call bch_next_delay() if rate is >= 1 GB/sec */
794
795         if (c->foreground_write_ratelimit_enabled &&
796             c->foreground_write_pd.rate.rate < (1 << 30) &&
797             !(op->flags & BCH_WRITE_DISCARD) && op->wp->throttle) {
798                 unsigned long flags;
799                 u64 delay;
800
801                 spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
802                 bch_ratelimit_increment(&c->foreground_write_pd.rate,
803                                         bio->bi_iter.bi_size);
804
805                 delay = bch_ratelimit_delay(&c->foreground_write_pd.rate);
806
807                 if (delay >= HZ / 100) {
808                         trace_bcache_write_throttle(c, inode, bio, delay);
809
810                         closure_get(&op->cl); /* list takes a ref */
811
812                         op->expires = jiffies + delay;
813                         op->next = NULL;
814
815                         if (c->write_wait_tail)
816                                 c->write_wait_tail->next = op;
817                         else
818                                 c->write_wait_head = op;
819                         c->write_wait_tail = op;
820
821                         if (!timer_pending(&c->foreground_write_wakeup))
822                                 mod_timer(&c->foreground_write_wakeup,
823                                           op->expires);
824
825                         spin_unlock_irqrestore(&c->foreground_write_pd_lock,
826                                                flags);
827                         continue_at(cl, __bch_write, index_update_wq(op));
828                 }
829
830                 spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
831         }
832
833         continue_at_nobarrier(cl, __bch_write, NULL);
834 }
835
836 void bch_write_op_init(struct bch_write_op *op, struct cache_set *c,
837                        struct bch_write_bio *bio, struct disk_reservation res,
838                        struct write_point *wp, struct bpos pos,
839                        u64 *journal_seq, unsigned flags)
840 {
841         EBUG_ON(res.sectors && !res.nr_replicas);
842
843         op->c           = c;
844         op->io_wq       = index_update_wq(op);
845         op->bio         = bio;
846         op->written     = 0;
847         op->error       = 0;
848         op->flags       = flags;
849         op->csum_type   = bch_data_checksum_type(c);
850         op->compression_type = c->opts.compression;
851         op->nr_replicas = res.nr_replicas;
852         op->alloc_reserve = RESERVE_NONE;
853         op->nonce       = 0;
854         op->pos         = pos;
855         op->version     = ZERO_VERSION;
856         op->res         = res;
857         op->wp          = wp;
858
859         if (journal_seq) {
860                 op->journal_seq_p = journal_seq;
861                 op->flags |= BCH_WRITE_JOURNAL_SEQ_PTR;
862         } else {
863                 op->journal_seq = 0;
864         }
865
866         op->index_update_fn = bch_write_index_default;
867
868         bch_keylist_init(&op->insert_keys,
869                          op->inline_keys,
870                          ARRAY_SIZE(op->inline_keys));
871
872         if (version_stress_test(c))
873                 get_random_bytes(&op->version, sizeof(op->version));
874 }
875
876 /* Discard */
877
878 /* bch_discard - discard a range of keys from start_key to end_key.
879  * @c           cache set
880  * @start_key   pointer to start location
881  *              NOTE: discard starts at bkey_start_offset(start_key)
882  * @end_key     pointer to end location
883  *              NOTE: discard ends at KEY_OFFSET(end_key)
884  * @version     version of discard (0ULL if none)
885  *
886  * Returns:
887  *       0 on success
888  *      <0 on error
889  *
890  * XXX: this needs to be refactored with inode_truncate, or more
891  *      appropriately inode_truncate should call this
892  */
893 int bch_discard(struct cache_set *c, struct bpos start,
894                 struct bpos end, struct bversion version,
895                 struct disk_reservation *disk_res,
896                 struct extent_insert_hook *hook,
897                 u64 *journal_seq)
898 {
899         return bch_btree_delete_range(c, BTREE_ID_EXTENTS, start, end, version,
900                                       disk_res, hook, journal_seq);
901 }
902
903 /* Cache promotion on read */
904
905 struct cache_promote_op {
906         struct closure          cl;
907         struct migrate_write    write;
908         struct bio_vec          bi_inline_vecs[0]; /* must be last */
909 };
910
911 /* Read */
912
913 static int bio_checksum_uncompress(struct cache_set *c,
914                                    struct bch_read_bio *rbio)
915 {
916         struct bio *src = &rbio->bio;
917         struct bio *dst = &bch_rbio_parent(rbio)->bio;
918         struct bvec_iter dst_iter = rbio->parent_iter;
919         struct nonce nonce = extent_nonce(rbio->version,
920                                 rbio->crc.nonce,
921                                 crc_uncompressed_size(NULL, &rbio->crc),
922                                 rbio->crc.compression_type);
923         struct bch_csum csum;
924         int ret = 0;
925
926         /*
927          * reset iterator for checksumming and copying bounced data: here we've
928          * set rbio->compressed_size to the amount of data we actually read,
929          * which was not necessarily the full extent if we were only bouncing
930          * in order to promote
931          */
932         if (rbio->bounce) {
933                 src->bi_iter.bi_size    = crc_compressed_size(NULL, &rbio->crc) << 9;
934                 src->bi_iter.bi_idx     = 0;
935                 src->bi_iter.bi_bvec_done = 0;
936         } else {
937                 src->bi_iter = rbio->parent_iter;
938         }
939
940         csum = bch_checksum_bio(c, rbio->crc.csum_type, nonce, src);
941         if (bch_dev_nonfatal_io_err_on(bch_crc_cmp(rbio->crc.csum, csum), rbio->ca,
942                         "data checksum error, inode %llu offset %llu: expected %0llx%0llx got %0llx%0llx (type %u)",
943                         rbio->inode, (u64) rbio->parent_iter.bi_sector << 9,
944                         rbio->crc.csum.hi, rbio->crc.csum.lo, csum.hi, csum.lo,
945                         rbio->crc.csum_type))
946                 ret = -EIO;
947
948         /*
949          * If there was a checksum error, still copy the data back - unless it
950          * was compressed, we don't want to decompress bad data:
951          */
952         if (rbio->crc.compression_type != BCH_COMPRESSION_NONE) {
953                 if (!ret) {
954                         bch_encrypt_bio(c, rbio->crc.csum_type, nonce, src);
955                         ret = bch_bio_uncompress(c, src, dst,
956                                                  dst_iter, rbio->crc);
957                         if (ret)
958                                 __bcache_io_error(c, "decompression error");
959                 }
960         } else if (rbio->bounce) {
961                 bio_advance(src, rbio->crc.offset << 9);
962
963                 /* don't need to decrypt the entire bio: */
964                 BUG_ON(src->bi_iter.bi_size < dst_iter.bi_size);
965                 src->bi_iter.bi_size = dst_iter.bi_size;
966
967                 nonce = nonce_add(nonce, rbio->crc.offset << 9);
968
969                 bch_encrypt_bio(c, rbio->crc.csum_type,
970                                 nonce, src);
971
972                 bio_copy_data_iter(dst, dst_iter,
973                                    src, src->bi_iter);
974         } else {
975                 bch_encrypt_bio(c, rbio->crc.csum_type, nonce, src);
976         }
977
978         return ret;
979 }
980
981 static void bch_rbio_free(struct cache_set *c, struct bch_read_bio *rbio)
982 {
983         struct bio *bio = &rbio->bio;
984
985         BUG_ON(rbio->ca);
986         BUG_ON(!rbio->split);
987
988         if (rbio->promote)
989                 kfree(rbio->promote);
990         if (rbio->bounce)
991                 bch_bio_free_pages_pool(c, bio);
992
993         bio_put(bio);
994 }
995
996 static void bch_rbio_done(struct cache_set *c, struct bch_read_bio *rbio)
997 {
998         struct bio *orig = &bch_rbio_parent(rbio)->bio;
999
1000         percpu_ref_put(&rbio->ca->ref);
1001         rbio->ca = NULL;
1002
1003         if (rbio->split) {
1004                 if (rbio->bio.bi_error)
1005                         orig->bi_error = rbio->bio.bi_error;
1006
1007                 bio_endio(orig);
1008                 bch_rbio_free(c, rbio);
1009         } else {
1010                 if (rbio->promote)
1011                         kfree(rbio->promote);
1012
1013                 orig->bi_end_io = rbio->orig_bi_end_io;
1014                 bio_endio_nodec(orig);
1015         }
1016 }
1017
1018 /*
1019  * Decide if we want to retry the read - returns true if read is being retried,
1020  * false if caller should pass error on up
1021  */
1022 static void bch_read_error_maybe_retry(struct cache_set *c,
1023                                        struct bch_read_bio *rbio,
1024                                        int error)
1025 {
1026         unsigned long flags;
1027
1028         if ((error == -EINTR) &&
1029             (rbio->flags & BCH_READ_RETRY_IF_STALE)) {
1030                 atomic_long_inc(&c->cache_read_races);
1031                 goto retry;
1032         }
1033
1034         if (error == -EIO) {
1035                 /* io error - do we have another replica? */
1036         }
1037
1038         bch_rbio_parent(rbio)->bio.bi_error = error;
1039         bch_rbio_done(c, rbio);
1040         return;
1041 retry:
1042         percpu_ref_put(&rbio->ca->ref);
1043         rbio->ca = NULL;
1044
1045         spin_lock_irqsave(&c->read_retry_lock, flags);
1046         bio_list_add(&c->read_retry_list, &rbio->bio);
1047         spin_unlock_irqrestore(&c->read_retry_lock, flags);
1048         queue_work(c->wq, &c->read_retry_work);
1049 }
1050
1051 static void cache_promote_done(struct closure *cl)
1052 {
1053         struct cache_promote_op *op =
1054                 container_of(cl, struct cache_promote_op, cl);
1055
1056         bch_bio_free_pages_pool(op->write.op.c, &op->write.wbio.bio);
1057         kfree(op);
1058 }
1059
1060 /* Inner part that may run in process context */
1061 static void __bch_read_endio(struct cache_set *c, struct bch_read_bio *rbio)
1062 {
1063         int ret;
1064
1065         ret = bio_checksum_uncompress(c, rbio);
1066         if (ret) {
1067                 bch_read_error_maybe_retry(c, rbio, ret);
1068                 return;
1069         }
1070
1071         if (rbio->promote &&
1072             !test_bit(BCH_FS_RO, &c->flags) &&
1073             !test_bit(BCH_FS_STOPPING, &c->flags)) {
1074                 struct cache_promote_op *promote = rbio->promote;
1075                 struct closure *cl = &promote->cl;
1076
1077                 BUG_ON(!rbio->split || !rbio->bounce);
1078
1079                 /* we now own pages: */
1080                 swap(promote->write.wbio.bio.bi_vcnt, rbio->bio.bi_vcnt);
1081                 rbio->promote = NULL;
1082
1083                 bch_rbio_done(c, rbio);
1084
1085                 closure_init(cl, &c->cl);
1086                 closure_call(&promote->write.op.cl, bch_write, c->wq, cl);
1087                 closure_return_with_destructor(cl, cache_promote_done);
1088         } else {
1089                 bch_rbio_done(c, rbio);
1090         }
1091 }
1092
1093 void bch_bio_decompress_work(struct work_struct *work)
1094 {
1095         struct bio_decompress_worker *d =
1096                 container_of(work, struct bio_decompress_worker, work);
1097         struct llist_node *list, *next;
1098         struct bch_read_bio *rbio;
1099
1100         while ((list = llist_del_all(&d->bio_list)))
1101                 for (list = llist_reverse_order(list);
1102                      list;
1103                      list = next) {
1104                         next = llist_next(list);
1105                         rbio = container_of(list, struct bch_read_bio, list);
1106
1107                         __bch_read_endio(d->c, rbio);
1108                 }
1109 }
1110
1111 static void bch_read_endio(struct bio *bio)
1112 {
1113         struct bch_read_bio *rbio =
1114                 container_of(bio, struct bch_read_bio, bio);
1115         struct cache_set *c = rbio->ca->set;
1116         int stale = ((rbio->flags & BCH_READ_RETRY_IF_STALE) && race_fault()) ||
1117                 ptr_stale(rbio->ca, &rbio->ptr) ? -EINTR : 0;
1118         int error = bio->bi_error ?: stale;
1119
1120         bch_account_io_completion_time(rbio->ca, rbio->submit_time_us, REQ_OP_READ);
1121
1122         bch_dev_nonfatal_io_err_on(bio->bi_error, rbio->ca, "data read");
1123
1124         if (error) {
1125                 bch_read_error_maybe_retry(c, rbio, error);
1126                 return;
1127         }
1128
1129         if (rbio->crc.compression_type != BCH_COMPRESSION_NONE ||
1130             bch_csum_type_is_encryption(rbio->crc.csum_type)) {
1131                 struct bio_decompress_worker *d;
1132
1133                 preempt_disable();
1134                 d = this_cpu_ptr(c->bio_decompress_worker);
1135                 llist_add(&rbio->list, &d->bio_list);
1136                 queue_work(system_unbound_wq, &d->work);
1137                 preempt_enable();
1138         } else {
1139                 __bch_read_endio(c, rbio);
1140         }
1141 }
1142
1143 void bch_read_extent_iter(struct cache_set *c, struct bch_read_bio *orig,
1144                           struct bvec_iter iter, struct bkey_s_c k,
1145                           struct extent_pick_ptr *pick, unsigned flags)
1146 {
1147         struct bch_read_bio *rbio;
1148         struct cache_promote_op *promote_op = NULL;
1149         unsigned skip = iter.bi_sector - bkey_start_offset(k.k);
1150         bool bounce = false, split, read_full = false;
1151
1152         EBUG_ON(bkey_start_offset(k.k) > iter.bi_sector ||
1153                 k.k->p.offset < bvec_iter_end_sector(iter));
1154
1155         /* only promote if we're not reading from the fastest tier: */
1156
1157         /*
1158          * XXX: multiple promotes can race with each other, wastefully. Keep a
1159          * list of outstanding promotes?
1160          */
1161         if ((flags & BCH_READ_PROMOTE) && pick->ca->mi.tier) {
1162                 /*
1163                  * biovec needs to be big enough to hold decompressed data, if
1164                  * the bch_write_extent() has to decompress/recompress it:
1165                  */
1166                 unsigned sectors =
1167                         max_t(unsigned, k.k->size,
1168                               crc_uncompressed_size(NULL, &pick->crc));
1169                 unsigned pages = DIV_ROUND_UP(sectors, PAGE_SECTORS);
1170
1171                 promote_op = kmalloc(sizeof(*promote_op) +
1172                                 sizeof(struct bio_vec) * pages, GFP_NOIO);
1173                 if (promote_op) {
1174                         struct bio *promote_bio = &promote_op->write.wbio.bio;
1175
1176                         bio_init(promote_bio);
1177                         promote_bio->bi_max_vecs = pages;
1178                         promote_bio->bi_io_vec  = promote_bio->bi_inline_vecs;
1179                         bounce = true;
1180                         /* could also set read_full */
1181                 }
1182         }
1183
1184         /*
1185          * note: if compression_type and crc_type both == none, then
1186          * compressed/uncompressed size is zero
1187          */
1188         if (pick->crc.compression_type != BCH_COMPRESSION_NONE ||
1189             (pick->crc.csum_type != BCH_CSUM_NONE &&
1190              (bvec_iter_sectors(iter) != crc_uncompressed_size(NULL, &pick->crc) ||
1191               (flags & BCH_READ_FORCE_BOUNCE)))) {
1192                 read_full = true;
1193                 bounce = true;
1194         }
1195
1196         if (bounce) {
1197                 unsigned sectors = read_full
1198                         ? (crc_compressed_size(NULL, &pick->crc) ?: k.k->size)
1199                         : bvec_iter_sectors(iter);
1200
1201                 rbio = container_of(bio_alloc_bioset(GFP_NOIO,
1202                                         DIV_ROUND_UP(sectors, PAGE_SECTORS),
1203                                         &c->bio_read_split),
1204                                     struct bch_read_bio, bio);
1205
1206                 bch_bio_alloc_pages_pool(c, &rbio->bio, sectors << 9);
1207                 split = true;
1208         } else if (!(flags & BCH_READ_MAY_REUSE_BIO) ||
1209                    !(flags & BCH_READ_IS_LAST)) {
1210                 /*
1211                  * Have to clone if there were any splits, due to error
1212                  * reporting issues (if a split errored, and retrying didn't
1213                  * work, when it reports the error to its parent (us) we don't
1214                  * know if the error was from our bio, and we should retry, or
1215                  * from the whole bio, in which case we don't want to retry and
1216                  * lose the error)
1217                  */
1218                 rbio = container_of(bio_clone_fast(&orig->bio,
1219                                         GFP_NOIO, &c->bio_read_split),
1220                                     struct bch_read_bio, bio);
1221                 rbio->bio.bi_iter = iter;
1222                 split = true;
1223         } else {
1224                 rbio = orig;
1225                 rbio->bio.bi_iter = iter;
1226                 split = false;
1227                 BUG_ON(bio_flagged(&rbio->bio, BIO_CHAIN));
1228         }
1229
1230         if (!(flags & BCH_READ_IS_LAST))
1231                 __bio_inc_remaining(&orig->bio);
1232
1233         if (split)
1234                 rbio->parent    = orig;
1235         else
1236                 rbio->orig_bi_end_io = orig->bio.bi_end_io;
1237         rbio->parent_iter       = iter;
1238
1239         rbio->inode             = k.k->p.inode;
1240         rbio->flags             = flags;
1241         rbio->bounce            = bounce;
1242         rbio->split             = split;
1243         rbio->version           = k.k->version;
1244         rbio->crc               = pick->crc;
1245         /*
1246          * crc.compressed_size will be 0 if there wasn't any checksum
1247          * information, also we need to stash the original size of the bio if we
1248          * bounced (which isn't necessarily the original key size, if we bounced
1249          * only for promoting)
1250          */
1251         rbio->crc._compressed_size = bio_sectors(&rbio->bio) - 1;
1252         rbio->ptr               = pick->ptr;
1253         rbio->ca                = pick->ca;
1254         rbio->promote           = promote_op;
1255
1256         rbio->bio.bi_bdev       = pick->ca->disk_sb.bdev;
1257         rbio->bio.bi_opf        = orig->bio.bi_opf;
1258         rbio->bio.bi_iter.bi_sector = pick->ptr.offset;
1259         rbio->bio.bi_end_io     = bch_read_endio;
1260
1261         if (promote_op) {
1262                 struct bio *promote_bio = &promote_op->write.wbio.bio;
1263
1264                 promote_bio->bi_iter = rbio->bio.bi_iter;
1265                 memcpy(promote_bio->bi_io_vec, rbio->bio.bi_io_vec,
1266                        sizeof(struct bio_vec) * rbio->bio.bi_vcnt);
1267
1268                 bch_migrate_write_init(c, &promote_op->write,
1269                                        &c->promote_write_point,
1270                                        k, NULL,
1271                                        BCH_WRITE_ALLOC_NOWAIT|
1272                                        BCH_WRITE_CACHED);
1273                 promote_op->write.promote = true;
1274
1275                 if (rbio->crc.compression_type) {
1276                         promote_op->write.op.flags |= BCH_WRITE_DATA_COMPRESSED;
1277                         promote_op->write.op.crc = rbio->crc;
1278                         promote_op->write.op.size = k.k->size;
1279                 } else if (read_full) {
1280                         /*
1281                          * Adjust bio to correspond to _live_ portion of @k -
1282                          * which might be less than what we're actually reading:
1283                          */
1284                         bio_advance(promote_bio, rbio->crc.offset << 9);
1285                         BUG_ON(bio_sectors(promote_bio) < k.k->size);
1286                         promote_bio->bi_iter.bi_size = k.k->size << 9;
1287                 } else {
1288                         /*
1289                          * Set insert pos to correspond to what we're actually
1290                          * reading:
1291                          */
1292                         promote_op->write.op.pos.offset = iter.bi_sector;
1293                 }
1294
1295                 promote_bio->bi_iter.bi_sector =
1296                         promote_op->write.op.pos.offset;
1297         }
1298
1299         /* _after_ promete stuff has looked at rbio->crc.offset */
1300         if (read_full)
1301                 rbio->crc.offset += skip;
1302         else
1303                 rbio->bio.bi_iter.bi_sector += skip;
1304
1305         rbio->submit_time_us = local_clock_us();
1306
1307 #ifndef CONFIG_BCACHE_NO_IO
1308         generic_make_request(&rbio->bio);
1309 #else
1310         bio_endio(&rbio->bio);
1311 #endif
1312 }
1313
1314 static void bch_read_iter(struct cache_set *c, struct bch_read_bio *rbio,
1315                           struct bvec_iter bvec_iter, u64 inode,
1316                           unsigned flags)
1317 {
1318         struct bio *bio = &rbio->bio;
1319         struct btree_iter iter;
1320         struct bkey_s_c k;
1321         int ret;
1322
1323         for_each_btree_key_with_holes(&iter, c, BTREE_ID_EXTENTS,
1324                                       POS(inode, bvec_iter.bi_sector), k) {
1325                 BKEY_PADDED(k) tmp;
1326                 struct extent_pick_ptr pick;
1327                 unsigned bytes, sectors;
1328                 bool is_last;
1329
1330                 /*
1331                  * Unlock the iterator while the btree node's lock is still in
1332                  * cache, before doing the IO:
1333                  */
1334                 bkey_reassemble(&tmp.k, k);
1335                 k = bkey_i_to_s_c(&tmp.k);
1336                 bch_btree_iter_unlock(&iter);
1337
1338                 bch_extent_pick_ptr(c, k, &pick);
1339                 if (IS_ERR(pick.ca)) {
1340                         bcache_io_error(c, bio, "no device to read from");
1341                         bio_endio(bio);
1342                         return;
1343                 }
1344
1345                 sectors = min_t(u64, k.k->p.offset,
1346                                 bvec_iter_end_sector(bvec_iter)) -
1347                         bvec_iter.bi_sector;
1348                 bytes = sectors << 9;
1349                 is_last = bytes == bvec_iter.bi_size;
1350                 swap(bvec_iter.bi_size, bytes);
1351
1352                 if (is_last)
1353                         flags |= BCH_READ_IS_LAST;
1354
1355                 if (pick.ca) {
1356                         PTR_BUCKET(pick.ca, &pick.ptr)->read_prio =
1357                                 c->prio_clock[READ].hand;
1358
1359                         bch_read_extent_iter(c, rbio, bvec_iter,
1360                                              k, &pick, flags);
1361
1362                         flags &= ~BCH_READ_MAY_REUSE_BIO;
1363                 } else {
1364                         zero_fill_bio_iter(bio, bvec_iter);
1365
1366                         if (is_last)
1367                                 bio_endio(bio);
1368                 }
1369
1370                 if (is_last)
1371                         return;
1372
1373                 swap(bvec_iter.bi_size, bytes);
1374                 bio_advance_iter(bio, &bvec_iter, bytes);
1375         }
1376
1377         /*
1378          * If we get here, it better have been because there was an error
1379          * reading a btree node
1380          */
1381         ret = bch_btree_iter_unlock(&iter);
1382         BUG_ON(!ret);
1383         bcache_io_error(c, bio, "btree IO error %i", ret);
1384         bio_endio(bio);
1385 }
1386
1387 void bch_read(struct cache_set *c, struct bch_read_bio *bio, u64 inode)
1388 {
1389         bch_increment_clock(c, bio_sectors(&bio->bio), READ);
1390
1391         bch_read_iter(c, bio, bio->bio.bi_iter, inode,
1392                       BCH_READ_FORCE_BOUNCE|
1393                       BCH_READ_RETRY_IF_STALE|
1394                       BCH_READ_PROMOTE|
1395                       BCH_READ_MAY_REUSE_BIO);
1396 }
1397 EXPORT_SYMBOL(bch_read);
1398
1399 /**
1400  * bch_read_retry - re-submit a bio originally from bch_read()
1401  */
1402 static void bch_read_retry(struct cache_set *c, struct bch_read_bio *rbio)
1403 {
1404         struct bch_read_bio *parent = bch_rbio_parent(rbio);
1405         struct bvec_iter iter = rbio->parent_iter;
1406         u64 inode = rbio->inode;
1407
1408         trace_bcache_read_retry(&rbio->bio);
1409
1410         if (rbio->split)
1411                 bch_rbio_free(c, rbio);
1412         else
1413                 rbio->bio.bi_end_io = rbio->orig_bi_end_io;
1414
1415         bch_read_iter(c, parent, iter, inode,
1416                       BCH_READ_FORCE_BOUNCE|
1417                       BCH_READ_RETRY_IF_STALE|
1418                       BCH_READ_PROMOTE);
1419 }
1420
1421 void bch_read_retry_work(struct work_struct *work)
1422 {
1423         struct cache_set *c = container_of(work, struct cache_set,
1424                                            read_retry_work);
1425         struct bch_read_bio *rbio;
1426         struct bio *bio;
1427         unsigned long flags;
1428
1429         while (1) {
1430                 spin_lock_irqsave(&c->read_retry_lock, flags);
1431                 bio = bio_list_pop(&c->read_retry_list);
1432                 spin_unlock_irqrestore(&c->read_retry_lock, flags);
1433
1434                 if (!bio)
1435                         break;
1436
1437                 rbio = container_of(bio, struct bch_read_bio, bio);
1438                 bch_read_retry(c, rbio);
1439         }
1440 }