]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcache/io.c
753c8a3d123be7436d10bd598149749dd8df427d
[bcachefs-tools-debian] / libbcache / io.c
1 /*
2  * Some low level IO code, and hacks for various block layer limitations
3  *
4  * Copyright 2010, 2011 Kent Overstreet <kent.overstreet@gmail.com>
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcache.h"
9 #include "alloc.h"
10 #include "bset.h"
11 #include "btree_update.h"
12 #include "buckets.h"
13 #include "checksum.h"
14 #include "compress.h"
15 #include "clock.h"
16 #include "debug.h"
17 #include "error.h"
18 #include "extents.h"
19 #include "io.h"
20 #include "journal.h"
21 #include "keylist.h"
22 #include "move.h"
23 #include "notify.h"
24 #include "stats.h"
25 #include "super-io.h"
26
27 #include <linux/blkdev.h>
28 #include <linux/random.h>
29
30 #include <trace/events/bcache.h>
31
32 static inline void __bio_inc_remaining(struct bio *bio)
33 {
34         bio_set_flag(bio, BIO_CHAIN);
35         smp_mb__before_atomic();
36         atomic_inc(&bio->__bi_remaining);
37 }
38
39 void bch_generic_make_request(struct bio *bio, struct bch_fs *c)
40 {
41         if (current->bio_list) {
42                 spin_lock(&c->bio_submit_lock);
43                 bio_list_add(&c->bio_submit_list, bio);
44                 spin_unlock(&c->bio_submit_lock);
45                 queue_work(bcache_io_wq, &c->bio_submit_work);
46         } else {
47                 generic_make_request(bio);
48         }
49 }
50
51 void bch_bio_submit_work(struct work_struct *work)
52 {
53         struct bch_fs *c = container_of(work, struct bch_fs,
54                                            bio_submit_work);
55         struct bio_list bl;
56         struct bio *bio;
57
58         spin_lock(&c->bio_submit_lock);
59         bl = c->bio_submit_list;
60         bio_list_init(&c->bio_submit_list);
61         spin_unlock(&c->bio_submit_lock);
62
63         while ((bio = bio_list_pop(&bl)))
64                 generic_make_request(bio);
65 }
66
67 /* Allocate, free from mempool: */
68
69 void bch_bio_free_pages_pool(struct bch_fs *c, struct bio *bio)
70 {
71         struct bio_vec *bv;
72         unsigned i;
73
74         bio_for_each_segment_all(bv, bio, i)
75                 if (bv->bv_page != ZERO_PAGE(0))
76                         mempool_free(bv->bv_page, &c->bio_bounce_pages);
77         bio->bi_vcnt = 0;
78 }
79
80 static void bch_bio_alloc_page_pool(struct bch_fs *c, struct bio *bio,
81                                     bool *using_mempool)
82 {
83         struct bio_vec *bv = &bio->bi_io_vec[bio->bi_vcnt++];
84
85         if (likely(!*using_mempool)) {
86                 bv->bv_page = alloc_page(GFP_NOIO);
87                 if (unlikely(!bv->bv_page)) {
88                         mutex_lock(&c->bio_bounce_pages_lock);
89                         *using_mempool = true;
90                         goto pool_alloc;
91
92                 }
93         } else {
94 pool_alloc:
95                 bv->bv_page = mempool_alloc(&c->bio_bounce_pages, GFP_NOIO);
96         }
97
98         bv->bv_len = PAGE_SIZE;
99         bv->bv_offset = 0;
100 }
101
102 void bch_bio_alloc_pages_pool(struct bch_fs *c, struct bio *bio,
103                               size_t bytes)
104 {
105         bool using_mempool = false;
106
107         bio->bi_iter.bi_size = bytes;
108
109         while (bio->bi_vcnt < DIV_ROUND_UP(bytes, PAGE_SIZE))
110                 bch_bio_alloc_page_pool(c, bio, &using_mempool);
111
112         if (using_mempool)
113                 mutex_unlock(&c->bio_bounce_pages_lock);
114 }
115
116 /* Bios with headers */
117
118 static void bch_submit_wbio(struct bch_fs *c, struct bch_write_bio *wbio,
119                             struct bch_dev *ca, const struct bch_extent_ptr *ptr,
120                             bool punt)
121 {
122         wbio->ca                = ca;
123         wbio->submit_time_us    = local_clock_us();
124         wbio->bio.bi_iter.bi_sector = ptr->offset;
125         wbio->bio.bi_bdev       = ca ? ca->disk_sb.bdev : NULL;
126
127         if (!ca)
128                 bcache_io_error(c, &wbio->bio, "device has been removed");
129         else if (punt)
130                 bch_generic_make_request(&wbio->bio, c);
131         else
132                 generic_make_request(&wbio->bio);
133 }
134
135 void bch_submit_wbio_replicas(struct bch_write_bio *wbio, struct bch_fs *c,
136                               const struct bkey_i *k, bool punt)
137 {
138         struct bkey_s_c_extent e = bkey_i_to_s_c_extent(k);
139         const struct bch_extent_ptr *ptr;
140         struct bch_write_bio *n;
141         struct bch_dev *ca;
142
143         BUG_ON(c->opts.nochanges);
144
145         wbio->split = false;
146         wbio->c = c;
147
148         extent_for_each_ptr(e, ptr) {
149                 ca = c->devs[ptr->dev];
150                 if (!percpu_ref_tryget(&ca->io_ref)) {
151                         bch_submit_wbio(c, wbio, NULL, ptr, punt);
152                         break;
153                 }
154
155                 if (ptr + 1 < &extent_entry_last(e)->ptr) {
156                         n = to_wbio(bio_clone_fast(&wbio->bio, GFP_NOIO,
157                                                    &ca->replica_set));
158
159                         n->bio.bi_end_io        = wbio->bio.bi_end_io;
160                         n->bio.bi_private       = wbio->bio.bi_private;
161                         n->c                    = c;
162                         n->orig                 = &wbio->bio;
163                         n->bounce               = false;
164                         n->split                = true;
165                         n->put_bio              = true;
166                         n->bio.bi_opf           = wbio->bio.bi_opf;
167                         __bio_inc_remaining(n->orig);
168                 } else {
169                         n = wbio;
170                 }
171
172                 if (!journal_flushes_device(ca))
173                         n->bio.bi_opf |= REQ_FUA;
174
175                 bch_submit_wbio(c, n, ca, ptr, punt);
176         }
177 }
178
179 /* IO errors */
180
181 /* Writes */
182
183 static struct workqueue_struct *index_update_wq(struct bch_write_op *op)
184 {
185         return op->alloc_reserve == RESERVE_MOVINGGC
186                 ? op->c->copygc_wq
187                 : op->c->wq;
188 }
189
190 static void __bch_write(struct closure *);
191
192 static void bch_write_done(struct closure *cl)
193 {
194         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
195
196         BUG_ON(!(op->flags & BCH_WRITE_DONE));
197
198         if (!op->error && (op->flags & BCH_WRITE_FLUSH))
199                 op->error = bch_journal_error(&op->c->journal);
200
201         bch_disk_reservation_put(op->c, &op->res);
202         percpu_ref_put(&op->c->writes);
203         bch_keylist_free(&op->insert_keys, op->inline_keys);
204         closure_return(cl);
205 }
206
207 static u64 keylist_sectors(struct keylist *keys)
208 {
209         struct bkey_i *k;
210         u64 ret = 0;
211
212         for_each_keylist_key(keys, k)
213                 ret += k->k.size;
214
215         return ret;
216 }
217
218 static int bch_write_index_default(struct bch_write_op *op)
219 {
220         struct keylist *keys = &op->insert_keys;
221         struct btree_iter iter;
222         int ret;
223
224         bch_btree_iter_init_intent(&iter, op->c, BTREE_ID_EXTENTS,
225                 bkey_start_pos(&bch_keylist_front(keys)->k));
226
227         ret = bch_btree_insert_list_at(&iter, keys, &op->res,
228                                        NULL, op_journal_seq(op),
229                                        BTREE_INSERT_NOFAIL);
230         bch_btree_iter_unlock(&iter);
231
232         return ret;
233 }
234
235 /**
236  * bch_write_index - after a write, update index to point to new data
237  */
238 static void bch_write_index(struct closure *cl)
239 {
240         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
241         struct bch_fs *c = op->c;
242         struct keylist *keys = &op->insert_keys;
243         unsigned i;
244
245         op->flags |= BCH_WRITE_LOOPED;
246
247         if (!bch_keylist_empty(keys)) {
248                 u64 sectors_start = keylist_sectors(keys);
249                 int ret = op->index_update_fn(op);
250
251                 BUG_ON(keylist_sectors(keys) && !ret);
252
253                 op->written += sectors_start - keylist_sectors(keys);
254
255                 if (ret) {
256                         __bcache_io_error(c, "btree IO error %i", ret);
257                         op->error = ret;
258                 }
259         }
260
261         for (i = 0; i < ARRAY_SIZE(op->open_buckets); i++)
262                 if (op->open_buckets[i]) {
263                         bch_open_bucket_put(c,
264                                             c->open_buckets +
265                                             op->open_buckets[i]);
266                         op->open_buckets[i] = 0;
267                 }
268
269         if (!(op->flags & BCH_WRITE_DONE))
270                 continue_at(cl, __bch_write, op->io_wq);
271
272         if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
273                 bch_journal_flush_seq_async(&c->journal,
274                                             *op_journal_seq(op),
275                                             cl);
276                 continue_at(cl, bch_write_done, index_update_wq(op));
277         } else {
278                 continue_at_nobarrier(cl, bch_write_done, NULL);
279         }
280 }
281
282 /**
283  * bch_write_discard - discard range of keys
284  *
285  * Used to implement discard, and to handle when writethrough write hits
286  * a write error on the cache device.
287  */
288 static void bch_write_discard(struct closure *cl)
289 {
290         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
291         struct bio *bio = &op->bio->bio;
292         struct bpos end = op->pos;
293
294         end.offset += bio_sectors(bio);
295
296         op->error = bch_discard(op->c, op->pos, end, op->version,
297                                 &op->res, NULL, NULL);
298 }
299
300 /*
301  * Convert extents to be inserted to discards after an error:
302  */
303 static void bch_write_io_error(struct closure *cl)
304 {
305         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
306
307         if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
308                 struct bkey_i *src = bch_keylist_front(&op->insert_keys);
309                 struct bkey_i *dst = bch_keylist_front(&op->insert_keys);
310
311                 /*
312                  * Our data write just errored, which means we've got a bunch
313                  * of keys to insert that point to data that wasn't
314                  * successfully written.
315                  *
316                  * We don't have to insert those keys but we still have to
317                  * invalidate that region of the cache - so, if we just strip
318                  * off all the pointers from the keys we'll accomplish just
319                  * that.
320                  */
321
322                 while (src != op->insert_keys.top) {
323                         struct bkey_i *n = bkey_next(src);
324
325                         set_bkey_val_u64s(&src->k, 0);
326                         src->k.type = KEY_TYPE_DISCARD;
327                         bkey_copy(dst, src);
328
329                         dst = bkey_next(dst);
330                         src = n;
331                 }
332
333                 op->insert_keys.top = dst;
334                 op->flags |= BCH_WRITE_DISCARD;
335         } else {
336                 /* TODO: We could try to recover from this. */
337                 while (!bch_keylist_empty(&op->insert_keys))
338                         bch_keylist_pop_front(&op->insert_keys);
339
340                 op->error = -EIO;
341                 op->flags |= BCH_WRITE_DONE;
342         }
343
344         bch_write_index(cl);
345 }
346
347 static void bch_write_endio(struct bio *bio)
348 {
349         struct closure *cl = bio->bi_private;
350         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
351         struct bch_write_bio *wbio = to_wbio(bio);
352         struct bch_fs *c = wbio->c;
353         struct bio *orig = wbio->orig;
354         struct bch_dev *ca = wbio->ca;
355
356         if (bch_dev_nonfatal_io_err_on(bio->bi_error, ca,
357                                        "data write")) {
358                 set_closure_fn(cl, bch_write_io_error, index_update_wq(op));
359         }
360
361         bch_account_io_completion_time(ca, wbio->submit_time_us,
362                                        REQ_OP_WRITE);
363         if (ca)
364                 percpu_ref_put(&ca->io_ref);
365
366         if (bio->bi_error && orig)
367                 orig->bi_error = bio->bi_error;
368
369         if (wbio->bounce)
370                 bch_bio_free_pages_pool(c, bio);
371
372         if (wbio->put_bio)
373                 bio_put(bio);
374
375         if (orig)
376                 bio_endio(orig);
377         else
378                 closure_put(cl);
379 }
380
381 static struct nonce extent_nonce(struct bversion version,
382                                  unsigned nonce,
383                                  unsigned uncompressed_size,
384                                  unsigned compression_type)
385 {
386         return (struct nonce) {{
387                 [0] = cpu_to_le32((nonce                << 12) |
388                                   (uncompressed_size    << 22)),
389                 [1] = cpu_to_le32(version.lo),
390                 [2] = cpu_to_le32(version.lo >> 32),
391                 [3] = cpu_to_le32(version.hi|
392                                   (compression_type << 24))^BCH_NONCE_EXTENT,
393         }};
394 }
395
396 static void init_append_extent(struct bch_write_op *op,
397                                unsigned compressed_size,
398                                unsigned uncompressed_size,
399                                unsigned compression_type,
400                                unsigned nonce,
401                                struct bch_csum csum, unsigned csum_type,
402                                struct open_bucket *ob)
403 {
404         struct bkey_i_extent *e = bkey_extent_init(op->insert_keys.top);
405
406         op->pos.offset += uncompressed_size;
407         e->k.p = op->pos;
408         e->k.size = uncompressed_size;
409         e->k.version = op->version;
410         bkey_extent_set_cached(&e->k, op->flags & BCH_WRITE_CACHED);
411
412         bch_extent_crc_append(e, compressed_size,
413                               uncompressed_size,
414                               compression_type,
415                               nonce, csum, csum_type);
416
417         bch_alloc_sectors_append_ptrs(op->c, e, op->nr_replicas,
418                                       ob, compressed_size);
419
420         bkey_extent_set_cached(&e->k, (op->flags & BCH_WRITE_CACHED));
421         bch_keylist_push(&op->insert_keys);
422 }
423
424 static int bch_write_extent(struct bch_write_op *op,
425                             struct open_bucket *ob,
426                             struct bio *orig)
427 {
428         struct bch_fs *c = op->c;
429         struct bio *bio;
430         struct bch_write_bio *wbio;
431         unsigned key_to_write_offset = op->insert_keys.top_p -
432                 op->insert_keys.keys_p;
433         struct bkey_i *key_to_write;
434         unsigned csum_type = op->csum_type;
435         unsigned compression_type = op->compression_type;
436         int ret;
437
438         /* don't refetch csum type/compression type */
439         barrier();
440
441         /* Need to decompress data? */
442         if ((op->flags & BCH_WRITE_DATA_COMPRESSED) &&
443             (crc_uncompressed_size(NULL, &op->crc) != op->size ||
444              crc_compressed_size(NULL, &op->crc) > ob->sectors_free)) {
445                 int ret;
446
447                 ret = bch_bio_uncompress_inplace(c, orig, op->size, op->crc);
448                 if (ret)
449                         return ret;
450
451                 op->flags &= ~BCH_WRITE_DATA_COMPRESSED;
452         }
453
454         if (op->flags & BCH_WRITE_DATA_COMPRESSED) {
455                 init_append_extent(op,
456                                    crc_compressed_size(NULL, &op->crc),
457                                    crc_uncompressed_size(NULL, &op->crc),
458                                    op->crc.compression_type,
459                                    op->crc.nonce,
460                                    op->crc.csum,
461                                    op->crc.csum_type,
462                                    ob);
463
464                 bio                     = orig;
465                 wbio                    = to_wbio(bio);
466                 wbio->orig              = NULL;
467                 wbio->bounce            = false;
468                 wbio->put_bio           = false;
469                 ret                     = 0;
470         } else if (csum_type != BCH_CSUM_NONE ||
471                    compression_type != BCH_COMPRESSION_NONE) {
472                 /* all units here in bytes */
473                 unsigned total_output = 0, output_available =
474                         min(ob->sectors_free << 9, orig->bi_iter.bi_size);
475                 unsigned crc_nonce = bch_csum_type_is_encryption(csum_type)
476                         ? op->nonce : 0;
477                 struct bch_csum csum;
478                 struct nonce nonce;
479
480                 bio = bio_alloc_bioset(GFP_NOIO,
481                                        DIV_ROUND_UP(output_available, PAGE_SIZE),
482                                        &c->bio_write);
483                 /*
484                  * XXX: can't use mempool for more than
485                  * BCH_COMPRESSED_EXTENT_MAX worth of pages
486                  */
487                 bch_bio_alloc_pages_pool(c, bio, output_available);
488
489                 /* copy WRITE_SYNC flag */
490                 bio->bi_opf             = orig->bi_opf;
491                 wbio                    = to_wbio(bio);
492                 wbio->orig              = NULL;
493                 wbio->bounce            = true;
494                 wbio->put_bio           = true;
495
496                 do {
497                         unsigned fragment_compression_type = compression_type;
498                         size_t dst_len, src_len;
499
500                         bch_bio_compress(c, bio, &dst_len,
501                                          orig, &src_len,
502                                          &fragment_compression_type);
503
504                         BUG_ON(!dst_len || dst_len > bio->bi_iter.bi_size);
505                         BUG_ON(!src_len || src_len > orig->bi_iter.bi_size);
506                         BUG_ON(dst_len & (block_bytes(c) - 1));
507                         BUG_ON(src_len & (block_bytes(c) - 1));
508
509                         swap(bio->bi_iter.bi_size, dst_len);
510                         nonce = extent_nonce(op->version,
511                                              crc_nonce,
512                                              src_len >> 9,
513                                              compression_type),
514
515                         bch_encrypt_bio(c, csum_type, nonce, bio);
516
517                         csum = bch_checksum_bio(c, csum_type, nonce, bio);
518                         swap(bio->bi_iter.bi_size, dst_len);
519
520                         init_append_extent(op,
521                                            dst_len >> 9, src_len >> 9,
522                                            fragment_compression_type,
523                                            crc_nonce, csum, csum_type, ob);
524
525                         total_output += dst_len;
526                         bio_advance(bio, dst_len);
527                         bio_advance(orig, src_len);
528                 } while (bio->bi_iter.bi_size &&
529                          orig->bi_iter.bi_size &&
530                          !bch_keylist_realloc(&op->insert_keys,
531                                               op->inline_keys,
532                                               ARRAY_SIZE(op->inline_keys),
533                                               BKEY_EXTENT_U64s_MAX));
534
535                 BUG_ON(total_output > output_available);
536
537                 memset(&bio->bi_iter, 0, sizeof(bio->bi_iter));
538                 bio->bi_iter.bi_size = total_output;
539
540                 /*
541                  * Free unneeded pages after compressing:
542                  */
543                 while (bio->bi_vcnt * PAGE_SIZE >
544                        round_up(bio->bi_iter.bi_size, PAGE_SIZE))
545                         mempool_free(bio->bi_io_vec[--bio->bi_vcnt].bv_page,
546                                      &c->bio_bounce_pages);
547
548                 ret = orig->bi_iter.bi_size != 0;
549         } else {
550                 bio = bio_next_split(orig, ob->sectors_free, GFP_NOIO,
551                                      &c->bio_write);
552
553                 wbio                    = to_wbio(bio);
554                 wbio->orig              = NULL;
555                 wbio->bounce            = false;
556                 wbio->put_bio           = bio != orig;
557
558                 init_append_extent(op, bio_sectors(bio), bio_sectors(bio),
559                                    compression_type, 0,
560                                    (struct bch_csum) { 0 }, csum_type, ob);
561
562                 ret = bio != orig;
563         }
564
565         bio->bi_end_io  = bch_write_endio;
566         bio->bi_private = &op->cl;
567         bio_set_op_attrs(bio, REQ_OP_WRITE, 0);
568
569         closure_get(bio->bi_private);
570
571         /* might have done a realloc... */
572
573         key_to_write = (void *) (op->insert_keys.keys_p + key_to_write_offset);
574
575         bch_check_mark_super(c, key_to_write, false);
576
577 #ifndef CONFIG_BCACHE_NO_IO
578         bch_submit_wbio_replicas(to_wbio(bio), c, key_to_write, false);
579 #else
580         to_wbio(bio)->ca = NULL;
581         bio_endio(bio);
582 #endif
583         return ret;
584 }
585
586 static void __bch_write(struct closure *cl)
587 {
588         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
589         struct bch_fs *c = op->c;
590         struct bio *bio = &op->bio->bio;
591         unsigned open_bucket_nr = 0;
592         struct open_bucket *b;
593         int ret;
594
595         memset(op->open_buckets, 0, sizeof(op->open_buckets));
596
597         if (op->flags & BCH_WRITE_DISCARD) {
598                 op->flags |= BCH_WRITE_DONE;
599                 bch_write_discard(cl);
600                 bio_put(bio);
601                 continue_at(cl, bch_write_done, index_update_wq(op));
602         }
603
604         /*
605          * Journal writes are marked REQ_PREFLUSH; if the original write was a
606          * flush, it'll wait on the journal write.
607          */
608         bio->bi_opf &= ~(REQ_PREFLUSH|REQ_FUA);
609
610         do {
611                 EBUG_ON(bio->bi_iter.bi_sector != op->pos.offset);
612                 EBUG_ON(!bio_sectors(bio));
613
614                 if (open_bucket_nr == ARRAY_SIZE(op->open_buckets))
615                         continue_at(cl, bch_write_index, index_update_wq(op));
616
617                 /* for the device pointers and 1 for the chksum */
618                 if (bch_keylist_realloc(&op->insert_keys,
619                                         op->inline_keys,
620                                         ARRAY_SIZE(op->inline_keys),
621                                         BKEY_EXTENT_U64s_MAX))
622                         continue_at(cl, bch_write_index, index_update_wq(op));
623
624                 b = bch_alloc_sectors_start(c, op->wp,
625                         op->nr_replicas,
626                         c->opts.data_replicas_required,
627                         op->alloc_reserve,
628                         (op->flags & BCH_WRITE_ALLOC_NOWAIT) ? NULL : cl);
629                 EBUG_ON(!b);
630
631                 if (unlikely(IS_ERR(b))) {
632                         if (unlikely(PTR_ERR(b) != -EAGAIN)) {
633                                 ret = PTR_ERR(b);
634                                 goto err;
635                         }
636
637                         /*
638                          * If we already have some keys, must insert them first
639                          * before allocating another open bucket. We only hit
640                          * this case if open_bucket_nr > 1.
641                          */
642                         if (!bch_keylist_empty(&op->insert_keys))
643                                 continue_at(cl, bch_write_index,
644                                             index_update_wq(op));
645
646                         /*
647                          * If we've looped, we're running out of a workqueue -
648                          * not the bch_write() caller's context - and we don't
649                          * want to block the workqueue:
650                          */
651                         if (op->flags & BCH_WRITE_LOOPED)
652                                 continue_at(cl, __bch_write, op->io_wq);
653
654                         /*
655                          * Otherwise, we do want to block the caller on alloc
656                          * failure instead of letting it queue up more and more
657                          * writes:
658                          * XXX: this technically needs a try_to_freeze() -
659                          * except that that's not safe because caller may have
660                          * issued other IO... hmm..
661                          */
662                         closure_sync(cl);
663                         continue;
664                 }
665
666                 BUG_ON(b - c->open_buckets == 0 ||
667                        b - c->open_buckets > U8_MAX);
668                 op->open_buckets[open_bucket_nr++] = b - c->open_buckets;
669
670                 ret = bch_write_extent(op, b, bio);
671
672                 bch_alloc_sectors_done(c, op->wp, b);
673
674                 if (ret < 0)
675                         goto err;
676         } while (ret);
677
678         op->flags |= BCH_WRITE_DONE;
679         continue_at(cl, bch_write_index, index_update_wq(op));
680 err:
681         if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
682                 /*
683                  * If we were writing cached data, not doing the write is fine
684                  * so long as we discard whatever would have been overwritten -
685                  * then it's equivalent to doing the write and immediately
686                  * reclaiming it.
687                  */
688
689                 bch_write_discard(cl);
690         } else {
691                 /*
692                  * Right now we can only error here if we went RO - the
693                  * allocation failed, but we already checked for -ENOSPC when we
694                  * got our reservation.
695                  *
696                  * XXX capacity might have changed, but we don't check for that
697                  * yet:
698                  */
699                 op->error = ret;
700         }
701
702         op->flags |= BCH_WRITE_DONE;
703
704         /*
705          * No reason not to insert keys for whatever data was successfully
706          * written (especially for a cmpxchg operation that's moving data
707          * around)
708          */
709         continue_at(cl, !bch_keylist_empty(&op->insert_keys)
710                     ? bch_write_index
711                     : bch_write_done, index_update_wq(op));
712 }
713
714 void bch_wake_delayed_writes(unsigned long data)
715 {
716         struct bch_fs *c = (void *) data;
717         struct bch_write_op *op;
718         unsigned long flags;
719
720         spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
721
722         while ((op = c->write_wait_head)) {
723                 if (time_after(op->expires, jiffies)) {
724                         mod_timer(&c->foreground_write_wakeup, op->expires);
725                         break;
726                 }
727
728                 c->write_wait_head = op->next;
729                 if (!c->write_wait_head)
730                         c->write_wait_tail = NULL;
731
732                 closure_put(&op->cl);
733         }
734
735         spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
736 }
737
738 /**
739  * bch_write - handle a write to a cache device or flash only volume
740  *
741  * This is the starting point for any data to end up in a cache device; it could
742  * be from a normal write, or a writeback write, or a write to a flash only
743  * volume - it's also used by the moving garbage collector to compact data in
744  * mostly empty buckets.
745  *
746  * It first writes the data to the cache, creating a list of keys to be inserted
747  * (if the data won't fit in a single open bucket, there will be multiple keys);
748  * after the data is written it calls bch_journal, and after the keys have been
749  * added to the next journal write they're inserted into the btree.
750  *
751  * It inserts the data in op->bio; bi_sector is used for the key offset, and
752  * op->inode is used for the key inode.
753  *
754  * If op->discard is true, instead of inserting the data it invalidates the
755  * region of the cache represented by op->bio and op->inode.
756  */
757 void bch_write(struct closure *cl)
758 {
759         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
760         struct bio *bio = &op->bio->bio;
761         struct bch_fs *c = op->c;
762         u64 inode = op->pos.inode;
763
764         trace_bcache_write(c, inode, bio,
765                            !(op->flags & BCH_WRITE_CACHED),
766                            op->flags & BCH_WRITE_DISCARD);
767
768         if (c->opts.nochanges ||
769             !percpu_ref_tryget(&c->writes)) {
770                 __bcache_io_error(c, "read only");
771                 op->error = -EROFS;
772                 bch_disk_reservation_put(c, &op->res);
773                 closure_return(cl);
774         }
775
776         if (bversion_zero(op->version) &&
777             bch_csum_type_is_encryption(op->csum_type))
778                 op->version.lo =
779                         atomic64_inc_return(&c->key_version) + 1;
780
781         if (!(op->flags & BCH_WRITE_DISCARD))
782                 bch_increment_clock(c, bio_sectors(bio), WRITE);
783
784         if (!(op->flags & BCH_WRITE_DISCARD))
785                 bch_mark_foreground_write(c, bio_sectors(bio));
786         else
787                 bch_mark_discard(c, bio_sectors(bio));
788
789         /* Don't call bch_next_delay() if rate is >= 1 GB/sec */
790
791         if (c->foreground_write_ratelimit_enabled &&
792             c->foreground_write_pd.rate.rate < (1 << 30) &&
793             !(op->flags & BCH_WRITE_DISCARD) && op->wp->throttle) {
794                 unsigned long flags;
795                 u64 delay;
796
797                 spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
798                 bch_ratelimit_increment(&c->foreground_write_pd.rate,
799                                         bio->bi_iter.bi_size);
800
801                 delay = bch_ratelimit_delay(&c->foreground_write_pd.rate);
802
803                 if (delay >= HZ / 100) {
804                         trace_bcache_write_throttle(c, inode, bio, delay);
805
806                         closure_get(&op->cl); /* list takes a ref */
807
808                         op->expires = jiffies + delay;
809                         op->next = NULL;
810
811                         if (c->write_wait_tail)
812                                 c->write_wait_tail->next = op;
813                         else
814                                 c->write_wait_head = op;
815                         c->write_wait_tail = op;
816
817                         if (!timer_pending(&c->foreground_write_wakeup))
818                                 mod_timer(&c->foreground_write_wakeup,
819                                           op->expires);
820
821                         spin_unlock_irqrestore(&c->foreground_write_pd_lock,
822                                                flags);
823                         continue_at(cl, __bch_write, index_update_wq(op));
824                 }
825
826                 spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
827         }
828
829         continue_at_nobarrier(cl, __bch_write, NULL);
830 }
831
832 void bch_write_op_init(struct bch_write_op *op, struct bch_fs *c,
833                        struct bch_write_bio *bio, struct disk_reservation res,
834                        struct write_point *wp, struct bpos pos,
835                        u64 *journal_seq, unsigned flags)
836 {
837         EBUG_ON(res.sectors && !res.nr_replicas);
838
839         op->c           = c;
840         op->io_wq       = index_update_wq(op);
841         op->bio         = bio;
842         op->written     = 0;
843         op->error       = 0;
844         op->flags       = flags;
845         op->csum_type   = bch_data_checksum_type(c);
846         op->compression_type = c->opts.compression;
847         op->nr_replicas = res.nr_replicas;
848         op->alloc_reserve = RESERVE_NONE;
849         op->nonce       = 0;
850         op->pos         = pos;
851         op->version     = ZERO_VERSION;
852         op->res         = res;
853         op->wp          = wp;
854
855         if (journal_seq) {
856                 op->journal_seq_p = journal_seq;
857                 op->flags |= BCH_WRITE_JOURNAL_SEQ_PTR;
858         } else {
859                 op->journal_seq = 0;
860         }
861
862         op->index_update_fn = bch_write_index_default;
863
864         bch_keylist_init(&op->insert_keys,
865                          op->inline_keys,
866                          ARRAY_SIZE(op->inline_keys));
867
868         if (version_stress_test(c))
869                 get_random_bytes(&op->version, sizeof(op->version));
870 }
871
872 /* Discard */
873
874 /* bch_discard - discard a range of keys from start_key to end_key.
875  * @c           filesystem
876  * @start_key   pointer to start location
877  *              NOTE: discard starts at bkey_start_offset(start_key)
878  * @end_key     pointer to end location
879  *              NOTE: discard ends at KEY_OFFSET(end_key)
880  * @version     version of discard (0ULL if none)
881  *
882  * Returns:
883  *       0 on success
884  *      <0 on error
885  *
886  * XXX: this needs to be refactored with inode_truncate, or more
887  *      appropriately inode_truncate should call this
888  */
889 int bch_discard(struct bch_fs *c, struct bpos start,
890                 struct bpos end, struct bversion version,
891                 struct disk_reservation *disk_res,
892                 struct extent_insert_hook *hook,
893                 u64 *journal_seq)
894 {
895         return bch_btree_delete_range(c, BTREE_ID_EXTENTS, start, end, version,
896                                       disk_res, hook, journal_seq);
897 }
898
899 /* Cache promotion on read */
900
901 struct cache_promote_op {
902         struct closure          cl;
903         struct migrate_write    write;
904         struct bio_vec          bi_inline_vecs[0]; /* must be last */
905 };
906
907 /* Read */
908
909 static int bio_checksum_uncompress(struct bch_fs *c,
910                                    struct bch_read_bio *rbio)
911 {
912         struct bio *src = &rbio->bio;
913         struct bio *dst = &bch_rbio_parent(rbio)->bio;
914         struct bvec_iter dst_iter = rbio->parent_iter;
915         struct nonce nonce = extent_nonce(rbio->version,
916                                 rbio->crc.nonce,
917                                 crc_uncompressed_size(NULL, &rbio->crc),
918                                 rbio->crc.compression_type);
919         struct bch_csum csum;
920         int ret = 0;
921
922         /*
923          * reset iterator for checksumming and copying bounced data: here we've
924          * set rbio->compressed_size to the amount of data we actually read,
925          * which was not necessarily the full extent if we were only bouncing
926          * in order to promote
927          */
928         if (rbio->bounce) {
929                 src->bi_iter.bi_size    = crc_compressed_size(NULL, &rbio->crc) << 9;
930                 src->bi_iter.bi_idx     = 0;
931                 src->bi_iter.bi_bvec_done = 0;
932         } else {
933                 src->bi_iter = rbio->parent_iter;
934         }
935
936         csum = bch_checksum_bio(c, rbio->crc.csum_type, nonce, src);
937         if (bch_dev_nonfatal_io_err_on(bch_crc_cmp(rbio->crc.csum, csum), rbio->ca,
938                         "data checksum error, inode %llu offset %llu: expected %0llx%0llx got %0llx%0llx (type %u)",
939                         rbio->inode, (u64) rbio->parent_iter.bi_sector << 9,
940                         rbio->crc.csum.hi, rbio->crc.csum.lo, csum.hi, csum.lo,
941                         rbio->crc.csum_type))
942                 ret = -EIO;
943
944         /*
945          * If there was a checksum error, still copy the data back - unless it
946          * was compressed, we don't want to decompress bad data:
947          */
948         if (rbio->crc.compression_type != BCH_COMPRESSION_NONE) {
949                 if (!ret) {
950                         bch_encrypt_bio(c, rbio->crc.csum_type, nonce, src);
951                         ret = bch_bio_uncompress(c, src, dst,
952                                                  dst_iter, rbio->crc);
953                         if (ret)
954                                 __bcache_io_error(c, "decompression error");
955                 }
956         } else if (rbio->bounce) {
957                 bio_advance(src, rbio->crc.offset << 9);
958
959                 /* don't need to decrypt the entire bio: */
960                 BUG_ON(src->bi_iter.bi_size < dst_iter.bi_size);
961                 src->bi_iter.bi_size = dst_iter.bi_size;
962
963                 nonce = nonce_add(nonce, rbio->crc.offset << 9);
964
965                 bch_encrypt_bio(c, rbio->crc.csum_type,
966                                 nonce, src);
967
968                 bio_copy_data_iter(dst, dst_iter,
969                                    src, src->bi_iter);
970         } else {
971                 bch_encrypt_bio(c, rbio->crc.csum_type, nonce, src);
972         }
973
974         return ret;
975 }
976
977 static void bch_rbio_free(struct bch_read_bio *rbio)
978 {
979         struct bch_fs *c = rbio->c;
980         struct bio *bio = &rbio->bio;
981
982         BUG_ON(rbio->ca);
983         BUG_ON(!rbio->split);
984
985         if (rbio->promote)
986                 kfree(rbio->promote);
987         if (rbio->bounce)
988                 bch_bio_free_pages_pool(c, bio);
989
990         bio_put(bio);
991 }
992
993 static void bch_rbio_done(struct bch_read_bio *rbio)
994 {
995         struct bio *orig = &bch_rbio_parent(rbio)->bio;
996
997         percpu_ref_put(&rbio->ca->io_ref);
998         rbio->ca = NULL;
999
1000         if (rbio->split) {
1001                 if (rbio->bio.bi_error)
1002                         orig->bi_error = rbio->bio.bi_error;
1003
1004                 bio_endio(orig);
1005                 bch_rbio_free(rbio);
1006         } else {
1007                 if (rbio->promote)
1008                         kfree(rbio->promote);
1009
1010                 orig->bi_end_io = rbio->orig_bi_end_io;
1011                 bio_endio_nodec(orig);
1012         }
1013 }
1014
1015 static void bch_rbio_error(struct bch_read_bio *rbio, int error)
1016 {
1017         bch_rbio_parent(rbio)->bio.bi_error = error;
1018         bch_rbio_done(rbio);
1019 }
1020
1021 static void bch_rbio_retry(struct bch_fs *c, struct bch_read_bio *rbio)
1022 {
1023         unsigned long flags;
1024
1025         percpu_ref_put(&rbio->ca->io_ref);
1026         rbio->ca = NULL;
1027
1028         spin_lock_irqsave(&c->read_retry_lock, flags);
1029         bio_list_add(&c->read_retry_list, &rbio->bio);
1030         spin_unlock_irqrestore(&c->read_retry_lock, flags);
1031         queue_work(c->wq, &c->read_retry_work);
1032 }
1033
1034 static void cache_promote_done(struct closure *cl)
1035 {
1036         struct cache_promote_op *op =
1037                 container_of(cl, struct cache_promote_op, cl);
1038
1039         bch_bio_free_pages_pool(op->write.op.c, &op->write.wbio.bio);
1040         kfree(op);
1041 }
1042
1043 /* Inner part that may run in process context */
1044 static void __bch_read_endio(struct work_struct *work)
1045 {
1046         struct bch_read_bio *rbio =
1047                 container_of(work, struct bch_read_bio, work);
1048         struct bch_fs *c = rbio->c;
1049         int ret;
1050
1051         ret = bio_checksum_uncompress(c, rbio);
1052         if (ret) {
1053                 /*
1054                  * Checksum error: if the bio wasn't bounced, we may have been
1055                  * reading into buffers owned by userspace (that userspace can
1056                  * scribble over) - retry the read, bouncing it this time:
1057                  */
1058                 if (!rbio->bounce && (rbio->flags & BCH_READ_USER_MAPPED)) {
1059                         rbio->flags |= BCH_READ_FORCE_BOUNCE;
1060                         bch_rbio_retry(c, rbio);
1061                 } else {
1062                         bch_rbio_error(rbio, -EIO);
1063                 }
1064                 return;
1065         }
1066
1067         if (rbio->promote) {
1068                 struct cache_promote_op *promote = rbio->promote;
1069                 struct closure *cl = &promote->cl;
1070
1071                 BUG_ON(!rbio->split || !rbio->bounce);
1072
1073                 /* we now own pages: */
1074                 swap(promote->write.wbio.bio.bi_vcnt, rbio->bio.bi_vcnt);
1075                 rbio->promote = NULL;
1076
1077                 bch_rbio_done(rbio);
1078
1079                 closure_init(cl, &c->cl);
1080                 closure_call(&promote->write.op.cl, bch_write, c->wq, cl);
1081                 closure_return_with_destructor(cl, cache_promote_done);
1082         } else {
1083                 bch_rbio_done(rbio);
1084         }
1085 }
1086
1087 static void bch_read_endio(struct bio *bio)
1088 {
1089         struct bch_read_bio *rbio =
1090                 container_of(bio, struct bch_read_bio, bio);
1091         struct bch_fs *c = rbio->c;
1092
1093         if (rbio->flags & BCH_READ_ACCOUNT_TIMES)
1094                 bch_account_io_completion_time(rbio->ca, rbio->submit_time_us,
1095                                                REQ_OP_READ);
1096
1097         if (bch_dev_nonfatal_io_err_on(bio->bi_error, rbio->ca, "data read")) {
1098                 /* XXX: retry IO errors when we have another replica */
1099                 bch_rbio_error(rbio, bio->bi_error);
1100                 return;
1101         }
1102
1103         if (rbio->ptr.cached &&
1104             (((rbio->flags & BCH_READ_RETRY_IF_STALE) && race_fault()) ||
1105              ptr_stale(rbio->ca, &rbio->ptr))) {
1106                 atomic_long_inc(&c->cache_read_races);
1107
1108                 if (rbio->flags & BCH_READ_RETRY_IF_STALE)
1109                         bch_rbio_retry(c, rbio);
1110                 else
1111                         bch_rbio_error(rbio, -EINTR);
1112                 return;
1113         }
1114
1115         if (rbio->crc.compression_type ||
1116             bch_csum_type_is_encryption(rbio->crc.csum_type))
1117                 queue_work(system_unbound_wq, &rbio->work);
1118         else if (rbio->crc.csum_type)
1119                 queue_work(system_highpri_wq, &rbio->work);
1120         else
1121                 __bch_read_endio(&rbio->work);
1122 }
1123
1124 static bool should_promote(struct bch_fs *c,
1125                            struct extent_pick_ptr *pick, unsigned flags)
1126 {
1127         if (!(flags & BCH_READ_PROMOTE))
1128                 return false;
1129
1130         if (percpu_ref_is_dying(&c->writes))
1131                 return false;
1132
1133         return c->fastest_tier &&
1134                 c->fastest_tier < c->tiers + pick->ca->mi.tier;
1135 }
1136
1137 void bch_read_extent_iter(struct bch_fs *c, struct bch_read_bio *orig,
1138                           struct bvec_iter iter, struct bkey_s_c k,
1139                           struct extent_pick_ptr *pick, unsigned flags)
1140 {
1141         struct bch_read_bio *rbio;
1142         struct cache_promote_op *promote_op = NULL;
1143         unsigned skip = iter.bi_sector - bkey_start_offset(k.k);
1144         bool bounce = false, split, read_full = false;
1145
1146         EBUG_ON(bkey_start_offset(k.k) > iter.bi_sector ||
1147                 k.k->p.offset < bvec_iter_end_sector(iter));
1148
1149         /* only promote if we're not reading from the fastest tier: */
1150
1151         /*
1152          * XXX: multiple promotes can race with each other, wastefully. Keep a
1153          * list of outstanding promotes?
1154          */
1155         if (should_promote(c, pick, flags)) {
1156                 /*
1157                  * biovec needs to be big enough to hold decompressed data, if
1158                  * the bch_write_extent() has to decompress/recompress it:
1159                  */
1160                 unsigned sectors =
1161                         max_t(unsigned, k.k->size,
1162                               crc_uncompressed_size(NULL, &pick->crc));
1163                 unsigned pages = DIV_ROUND_UP(sectors, PAGE_SECTORS);
1164
1165                 promote_op = kmalloc(sizeof(*promote_op) +
1166                                 sizeof(struct bio_vec) * pages, GFP_NOIO);
1167                 if (promote_op) {
1168                         struct bio *promote_bio = &promote_op->write.wbio.bio;
1169
1170                         bio_init(promote_bio);
1171                         promote_bio->bi_max_vecs = pages;
1172                         promote_bio->bi_io_vec  = promote_bio->bi_inline_vecs;
1173                         bounce = true;
1174                         /* could also set read_full */
1175                 }
1176         }
1177
1178         /*
1179          * note: if compression_type and crc_type both == none, then
1180          * compressed/uncompressed size is zero
1181          */
1182         if (pick->crc.compression_type != BCH_COMPRESSION_NONE ||
1183             (pick->crc.csum_type != BCH_CSUM_NONE &&
1184              (bvec_iter_sectors(iter) != crc_uncompressed_size(NULL, &pick->crc) ||
1185               (bch_csum_type_is_encryption(pick->crc.csum_type) &&
1186                (flags & BCH_READ_USER_MAPPED)) ||
1187               (flags & BCH_READ_FORCE_BOUNCE)))) {
1188                 read_full = true;
1189                 bounce = true;
1190         }
1191
1192         if (bounce) {
1193                 unsigned sectors = read_full
1194                         ? (crc_compressed_size(NULL, &pick->crc) ?: k.k->size)
1195                         : bvec_iter_sectors(iter);
1196
1197                 rbio = container_of(bio_alloc_bioset(GFP_NOIO,
1198                                         DIV_ROUND_UP(sectors, PAGE_SECTORS),
1199                                         &c->bio_read_split),
1200                                     struct bch_read_bio, bio);
1201
1202                 bch_bio_alloc_pages_pool(c, &rbio->bio, sectors << 9);
1203                 split = true;
1204         } else if (!(flags & BCH_READ_MAY_REUSE_BIO) ||
1205                    !(flags & BCH_READ_IS_LAST)) {
1206                 /*
1207                  * Have to clone if there were any splits, due to error
1208                  * reporting issues (if a split errored, and retrying didn't
1209                  * work, when it reports the error to its parent (us) we don't
1210                  * know if the error was from our bio, and we should retry, or
1211                  * from the whole bio, in which case we don't want to retry and
1212                  * lose the error)
1213                  */
1214                 rbio = container_of(bio_clone_fast(&orig->bio,
1215                                         GFP_NOIO, &c->bio_read_split),
1216                                     struct bch_read_bio, bio);
1217                 rbio->bio.bi_iter = iter;
1218                 split = true;
1219         } else {
1220                 rbio = orig;
1221                 rbio->bio.bi_iter = iter;
1222                 split = false;
1223                 BUG_ON(bio_flagged(&rbio->bio, BIO_CHAIN));
1224         }
1225
1226         if (!(flags & BCH_READ_IS_LAST))
1227                 __bio_inc_remaining(&orig->bio);
1228
1229         if (split)
1230                 rbio->parent    = orig;
1231         else
1232                 rbio->orig_bi_end_io = orig->bio.bi_end_io;
1233         rbio->parent_iter       = iter;
1234
1235         rbio->flags             = flags;
1236         rbio->bounce            = bounce;
1237         rbio->split             = split;
1238         rbio->c                 = c;
1239         rbio->ca                = pick->ca;
1240         rbio->ptr               = pick->ptr;
1241         rbio->crc               = pick->crc;
1242         /*
1243          * crc.compressed_size will be 0 if there wasn't any checksum
1244          * information, also we need to stash the original size of the bio if we
1245          * bounced (which isn't necessarily the original key size, if we bounced
1246          * only for promoting)
1247          */
1248         rbio->crc._compressed_size = bio_sectors(&rbio->bio) - 1;
1249         rbio->version           = k.k->version;
1250         rbio->promote           = promote_op;
1251         rbio->inode             = k.k->p.inode;
1252         INIT_WORK(&rbio->work, __bch_read_endio);
1253
1254         rbio->bio.bi_bdev       = pick->ca->disk_sb.bdev;
1255         rbio->bio.bi_opf        = orig->bio.bi_opf;
1256         rbio->bio.bi_iter.bi_sector = pick->ptr.offset;
1257         rbio->bio.bi_end_io     = bch_read_endio;
1258
1259         if (promote_op) {
1260                 struct bio *promote_bio = &promote_op->write.wbio.bio;
1261
1262                 promote_bio->bi_iter = rbio->bio.bi_iter;
1263                 memcpy(promote_bio->bi_io_vec, rbio->bio.bi_io_vec,
1264                        sizeof(struct bio_vec) * rbio->bio.bi_vcnt);
1265
1266                 bch_migrate_write_init(c, &promote_op->write,
1267                                        &c->promote_write_point,
1268                                        k, NULL,
1269                                        BCH_WRITE_ALLOC_NOWAIT|
1270                                        BCH_WRITE_CACHED);
1271                 promote_op->write.promote = true;
1272
1273                 if (rbio->crc.compression_type) {
1274                         promote_op->write.op.flags |= BCH_WRITE_DATA_COMPRESSED;
1275                         promote_op->write.op.crc = rbio->crc;
1276                         promote_op->write.op.size = k.k->size;
1277                 } else if (read_full) {
1278                         /*
1279                          * Adjust bio to correspond to _live_ portion of @k -
1280                          * which might be less than what we're actually reading:
1281                          */
1282                         bio_advance(promote_bio, rbio->crc.offset << 9);
1283                         BUG_ON(bio_sectors(promote_bio) < k.k->size);
1284                         promote_bio->bi_iter.bi_size = k.k->size << 9;
1285                 } else {
1286                         /*
1287                          * Set insert pos to correspond to what we're actually
1288                          * reading:
1289                          */
1290                         promote_op->write.op.pos.offset = iter.bi_sector;
1291                 }
1292
1293                 promote_bio->bi_iter.bi_sector =
1294                         promote_op->write.op.pos.offset;
1295         }
1296
1297         /* _after_ promete stuff has looked at rbio->crc.offset */
1298         if (read_full)
1299                 rbio->crc.offset += skip;
1300         else
1301                 rbio->bio.bi_iter.bi_sector += skip;
1302
1303         rbio->submit_time_us = local_clock_us();
1304
1305 #ifndef CONFIG_BCACHE_NO_IO
1306         generic_make_request(&rbio->bio);
1307 #else
1308         bio_endio(&rbio->bio);
1309 #endif
1310 }
1311
1312 static void bch_read_iter(struct bch_fs *c, struct bch_read_bio *rbio,
1313                           struct bvec_iter bvec_iter, u64 inode,
1314                           unsigned flags)
1315 {
1316         struct bio *bio = &rbio->bio;
1317         struct btree_iter iter;
1318         struct bkey_s_c k;
1319         int ret;
1320
1321         for_each_btree_key_with_holes(&iter, c, BTREE_ID_EXTENTS,
1322                                       POS(inode, bvec_iter.bi_sector), k) {
1323                 BKEY_PADDED(k) tmp;
1324                 struct extent_pick_ptr pick;
1325                 unsigned bytes, sectors;
1326                 bool is_last;
1327
1328                 /*
1329                  * Unlock the iterator while the btree node's lock is still in
1330                  * cache, before doing the IO:
1331                  */
1332                 bkey_reassemble(&tmp.k, k);
1333                 k = bkey_i_to_s_c(&tmp.k);
1334                 bch_btree_iter_unlock(&iter);
1335
1336                 bch_extent_pick_ptr(c, k, &pick);
1337                 if (IS_ERR(pick.ca)) {
1338                         bcache_io_error(c, bio, "no device to read from");
1339                         bio_endio(bio);
1340                         return;
1341                 }
1342
1343                 sectors = min_t(u64, k.k->p.offset,
1344                                 bvec_iter_end_sector(bvec_iter)) -
1345                         bvec_iter.bi_sector;
1346                 bytes = sectors << 9;
1347                 is_last = bytes == bvec_iter.bi_size;
1348                 swap(bvec_iter.bi_size, bytes);
1349
1350                 if (is_last)
1351                         flags |= BCH_READ_IS_LAST;
1352
1353                 if (pick.ca) {
1354                         PTR_BUCKET(pick.ca, &pick.ptr)->read_prio =
1355                                 c->prio_clock[READ].hand;
1356
1357                         bch_read_extent_iter(c, rbio, bvec_iter,
1358                                              k, &pick, flags);
1359
1360                         flags &= ~BCH_READ_MAY_REUSE_BIO;
1361                 } else {
1362                         zero_fill_bio_iter(bio, bvec_iter);
1363
1364                         if (is_last)
1365                                 bio_endio(bio);
1366                 }
1367
1368                 if (is_last)
1369                         return;
1370
1371                 swap(bvec_iter.bi_size, bytes);
1372                 bio_advance_iter(bio, &bvec_iter, bytes);
1373         }
1374
1375         /*
1376          * If we get here, it better have been because there was an error
1377          * reading a btree node
1378          */
1379         ret = bch_btree_iter_unlock(&iter);
1380         BUG_ON(!ret);
1381         bcache_io_error(c, bio, "btree IO error %i", ret);
1382         bio_endio(bio);
1383 }
1384
1385 void bch_read(struct bch_fs *c, struct bch_read_bio *bio, u64 inode)
1386 {
1387         bch_increment_clock(c, bio_sectors(&bio->bio), READ);
1388
1389         bch_read_iter(c, bio, bio->bio.bi_iter, inode,
1390                       BCH_READ_RETRY_IF_STALE|
1391                       BCH_READ_PROMOTE|
1392                       BCH_READ_MAY_REUSE_BIO|
1393                       BCH_READ_USER_MAPPED);
1394 }
1395
1396 /**
1397  * bch_read_retry - re-submit a bio originally from bch_read()
1398  */
1399 static void bch_read_retry(struct bch_fs *c, struct bch_read_bio *rbio)
1400 {
1401         struct bch_read_bio *parent = bch_rbio_parent(rbio);
1402         struct bvec_iter iter = rbio->parent_iter;
1403         unsigned flags = rbio->flags;
1404         u64 inode = rbio->inode;
1405
1406         trace_bcache_read_retry(&rbio->bio);
1407
1408         if (rbio->split)
1409                 bch_rbio_free(rbio);
1410         else
1411                 rbio->bio.bi_end_io = rbio->orig_bi_end_io;
1412
1413         bch_read_iter(c, parent, iter, inode, flags);
1414 }
1415
1416 void bch_read_retry_work(struct work_struct *work)
1417 {
1418         struct bch_fs *c = container_of(work, struct bch_fs,
1419                                            read_retry_work);
1420         struct bch_read_bio *rbio;
1421         struct bio *bio;
1422         unsigned long flags;
1423
1424         while (1) {
1425                 spin_lock_irqsave(&c->read_retry_lock, flags);
1426                 bio = bio_list_pop(&c->read_retry_list);
1427                 spin_unlock_irqrestore(&c->read_retry_lock, flags);
1428
1429                 if (!bio)
1430                         break;
1431
1432                 rbio = container_of(bio, struct bch_read_bio, bio);
1433                 bch_read_retry(c, rbio);
1434         }
1435 }