]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcache/io.c
bcache in userspace; userspace fsck
[bcachefs-tools-debian] / libbcache / io.c
1 /*
2  * Some low level IO code, and hacks for various block layer limitations
3  *
4  * Copyright 2010, 2011 Kent Overstreet <kent.overstreet@gmail.com>
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcache.h"
9 #include "alloc.h"
10 #include "bset.h"
11 #include "btree_update.h"
12 #include "buckets.h"
13 #include "checksum.h"
14 #include "compress.h"
15 #include "clock.h"
16 #include "debug.h"
17 #include "error.h"
18 #include "extents.h"
19 #include "io.h"
20 #include "journal.h"
21 #include "keylist.h"
22 #include "move.h"
23 #include "notify.h"
24 #include "stats.h"
25 #include "super.h"
26
27 #include <linux/blkdev.h>
28 #include <linux/random.h>
29
30 #include <trace/events/bcache.h>
31
32 static inline void __bio_inc_remaining(struct bio *bio)
33 {
34         bio_set_flag(bio, BIO_CHAIN);
35         smp_mb__before_atomic();
36         atomic_inc(&bio->__bi_remaining);
37 }
38
39 void bch_generic_make_request(struct bio *bio, struct cache_set *c)
40 {
41         if (current->bio_list) {
42                 spin_lock(&c->bio_submit_lock);
43                 bio_list_add(&c->bio_submit_list, bio);
44                 spin_unlock(&c->bio_submit_lock);
45                 queue_work(bcache_io_wq, &c->bio_submit_work);
46         } else {
47                 generic_make_request(bio);
48         }
49 }
50
51 void bch_bio_submit_work(struct work_struct *work)
52 {
53         struct cache_set *c = container_of(work, struct cache_set,
54                                            bio_submit_work);
55         struct bio_list bl;
56         struct bio *bio;
57
58         spin_lock(&c->bio_submit_lock);
59         bl = c->bio_submit_list;
60         bio_list_init(&c->bio_submit_list);
61         spin_unlock(&c->bio_submit_lock);
62
63         while ((bio = bio_list_pop(&bl)))
64                 generic_make_request(bio);
65 }
66
67 /* Allocate, free from mempool: */
68
69 void bch_bio_free_pages_pool(struct cache_set *c, struct bio *bio)
70 {
71         struct bio_vec *bv;
72         unsigned i;
73
74         bio_for_each_segment_all(bv, bio, i)
75                 if (bv->bv_page != ZERO_PAGE(0))
76                         mempool_free(bv->bv_page, &c->bio_bounce_pages);
77         bio->bi_vcnt = 0;
78 }
79
80 static void bch_bio_alloc_page_pool(struct cache_set *c, struct bio *bio,
81                                     bool *using_mempool)
82 {
83         struct bio_vec *bv = &bio->bi_io_vec[bio->bi_vcnt++];
84
85         if (likely(!*using_mempool)) {
86                 bv->bv_page = alloc_page(GFP_NOIO);
87                 if (unlikely(!bv->bv_page)) {
88                         mutex_lock(&c->bio_bounce_pages_lock);
89                         *using_mempool = true;
90                         goto pool_alloc;
91
92                 }
93         } else {
94 pool_alloc:
95                 bv->bv_page = mempool_alloc(&c->bio_bounce_pages, GFP_NOIO);
96         }
97
98         bv->bv_len = PAGE_SIZE;
99         bv->bv_offset = 0;
100 }
101
102 void bch_bio_alloc_pages_pool(struct cache_set *c, struct bio *bio,
103                               size_t bytes)
104 {
105         bool using_mempool = false;
106
107         bio->bi_iter.bi_size = bytes;
108
109         while (bio->bi_vcnt < DIV_ROUND_UP(bytes, PAGE_SIZE))
110                 bch_bio_alloc_page_pool(c, bio, &using_mempool);
111
112         if (using_mempool)
113                 mutex_unlock(&c->bio_bounce_pages_lock);
114 }
115
116 /* Bios with headers */
117
118 static void bch_submit_wbio(struct cache_set *c, struct bch_write_bio *wbio,
119                             struct cache *ca, const struct bch_extent_ptr *ptr,
120                             bool punt)
121 {
122         wbio->ca                = ca;
123         wbio->submit_time_us    = local_clock_us();
124         wbio->bio.bi_iter.bi_sector = ptr->offset;
125         wbio->bio.bi_bdev       = ca ? ca->disk_sb.bdev : NULL;
126
127         if (!ca)
128                 bcache_io_error(c, &wbio->bio, "device has been removed");
129         else if (punt)
130                 bch_generic_make_request(&wbio->bio, c);
131         else
132                 generic_make_request(&wbio->bio);
133 }
134
135 void bch_submit_wbio_replicas(struct bch_write_bio *wbio, struct cache_set *c,
136                               const struct bkey_i *k, bool punt)
137 {
138         struct bkey_s_c_extent e = bkey_i_to_s_c_extent(k);
139         const struct bch_extent_ptr *ptr;
140         struct bch_write_bio *n;
141         struct cache *ca;
142
143         wbio->split = false;
144         wbio->c = c;
145
146         extent_for_each_ptr(e, ptr) {
147                 rcu_read_lock();
148                 ca = PTR_CACHE(c, ptr);
149                 if (ca)
150                         percpu_ref_get(&ca->ref);
151                 rcu_read_unlock();
152
153                 if (!ca) {
154                         bch_submit_wbio(c, wbio, ca, ptr, punt);
155                         break;
156                 }
157
158                 if (ptr + 1 < &extent_entry_last(e)->ptr) {
159                         n = to_wbio(bio_clone_fast(&wbio->bio, GFP_NOIO,
160                                                    &ca->replica_set));
161
162                         n->bio.bi_end_io        = wbio->bio.bi_end_io;
163                         n->bio.bi_private       = wbio->bio.bi_private;
164                         n->c                    = c;
165                         n->orig                 = &wbio->bio;
166                         n->bounce               = false;
167                         n->split                = true;
168                         n->put_bio              = true;
169                         n->bio.bi_opf           = wbio->bio.bi_opf;
170                         __bio_inc_remaining(n->orig);
171                 } else {
172                         n = wbio;
173                 }
174
175                 if (!journal_flushes_device(ca))
176                         n->bio.bi_opf |= REQ_FUA;
177
178                 bch_submit_wbio(c, n, ca, ptr, punt);
179         }
180 }
181
182 /* IO errors */
183
184 /* Writes */
185
186 static struct workqueue_struct *index_update_wq(struct bch_write_op *op)
187 {
188         return op->alloc_reserve == RESERVE_MOVINGGC
189                 ? op->c->copygc_wq
190                 : op->c->wq;
191 }
192
193 static void __bch_write(struct closure *);
194
195 static void bch_write_done(struct closure *cl)
196 {
197         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
198
199         BUG_ON(!(op->flags & BCH_WRITE_DONE));
200
201         if (!op->error && (op->flags & BCH_WRITE_FLUSH))
202                 op->error = bch_journal_error(&op->c->journal);
203
204         bch_disk_reservation_put(op->c, &op->res);
205         percpu_ref_put(&op->c->writes);
206         bch_keylist_free(&op->insert_keys, op->inline_keys);
207         closure_return(cl);
208 }
209
210 static u64 keylist_sectors(struct keylist *keys)
211 {
212         struct bkey_i *k;
213         u64 ret = 0;
214
215         for_each_keylist_key(keys, k)
216                 ret += k->k.size;
217
218         return ret;
219 }
220
221 static int bch_write_index_default(struct bch_write_op *op)
222 {
223         struct keylist *keys = &op->insert_keys;
224         struct btree_iter iter;
225         int ret;
226
227         bch_btree_iter_init_intent(&iter, op->c, BTREE_ID_EXTENTS,
228                 bkey_start_pos(&bch_keylist_front(keys)->k));
229
230         ret = bch_btree_insert_list_at(&iter, keys, &op->res,
231                                        NULL, op_journal_seq(op),
232                                        BTREE_INSERT_NOFAIL);
233         bch_btree_iter_unlock(&iter);
234
235         return ret;
236 }
237
238 /**
239  * bch_write_index - after a write, update index to point to new data
240  */
241 static void bch_write_index(struct closure *cl)
242 {
243         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
244         struct cache_set *c = op->c;
245         struct keylist *keys = &op->insert_keys;
246         unsigned i;
247
248         op->flags |= BCH_WRITE_LOOPED;
249
250         if (!bch_keylist_empty(keys)) {
251                 u64 sectors_start = keylist_sectors(keys);
252                 int ret = op->index_update_fn(op);
253
254                 BUG_ON(keylist_sectors(keys) && !ret);
255
256                 op->written += sectors_start - keylist_sectors(keys);
257
258                 if (ret) {
259                         __bcache_io_error(c, "btree IO error %i", ret);
260                         op->error = ret;
261                 }
262         }
263
264         for (i = 0; i < ARRAY_SIZE(op->open_buckets); i++)
265                 if (op->open_buckets[i]) {
266                         bch_open_bucket_put(c,
267                                             c->open_buckets +
268                                             op->open_buckets[i]);
269                         op->open_buckets[i] = 0;
270                 }
271
272         if (!(op->flags & BCH_WRITE_DONE))
273                 continue_at(cl, __bch_write, op->io_wq);
274
275         if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
276                 bch_journal_flush_seq_async(&c->journal,
277                                             *op_journal_seq(op),
278                                             cl);
279                 continue_at(cl, bch_write_done, index_update_wq(op));
280         } else {
281                 continue_at_nobarrier(cl, bch_write_done, NULL);
282         }
283 }
284
285 /**
286  * bch_write_discard - discard range of keys
287  *
288  * Used to implement discard, and to handle when writethrough write hits
289  * a write error on the cache device.
290  */
291 static void bch_write_discard(struct closure *cl)
292 {
293         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
294         struct bio *bio = &op->bio->bio;
295         struct bpos end = op->pos;
296
297         end.offset += bio_sectors(bio);
298
299         op->error = bch_discard(op->c, op->pos, end, op->version,
300                                 &op->res, NULL, NULL);
301 }
302
303 /*
304  * Convert extents to be inserted to discards after an error:
305  */
306 static void bch_write_io_error(struct closure *cl)
307 {
308         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
309
310         if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
311                 struct bkey_i *src = bch_keylist_front(&op->insert_keys);
312                 struct bkey_i *dst = bch_keylist_front(&op->insert_keys);
313
314                 /*
315                  * Our data write just errored, which means we've got a bunch
316                  * of keys to insert that point to data that wasn't
317                  * successfully written.
318                  *
319                  * We don't have to insert those keys but we still have to
320                  * invalidate that region of the cache - so, if we just strip
321                  * off all the pointers from the keys we'll accomplish just
322                  * that.
323                  */
324
325                 while (src != op->insert_keys.top) {
326                         struct bkey_i *n = bkey_next(src);
327
328                         set_bkey_val_u64s(&src->k, 0);
329                         src->k.type = KEY_TYPE_DISCARD;
330                         bkey_copy(dst, src);
331
332                         dst = bkey_next(dst);
333                         src = n;
334                 }
335
336                 op->insert_keys.top = dst;
337                 op->flags |= BCH_WRITE_DISCARD;
338         } else {
339                 /* TODO: We could try to recover from this. */
340                 while (!bch_keylist_empty(&op->insert_keys))
341                         bch_keylist_pop_front(&op->insert_keys);
342
343                 op->error = -EIO;
344                 op->flags |= BCH_WRITE_DONE;
345         }
346
347         bch_write_index(cl);
348 }
349
350 static void bch_write_endio(struct bio *bio)
351 {
352         struct closure *cl = bio->bi_private;
353         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
354         struct bch_write_bio *wbio = to_wbio(bio);
355         struct cache_set *c = wbio->c;
356         struct bio *orig = wbio->orig;
357         struct cache *ca = wbio->ca;
358
359         if (cache_nonfatal_io_err_on(bio->bi_error, ca,
360                                      "data write"))
361                 set_closure_fn(cl, bch_write_io_error, index_update_wq(op));
362
363         bch_account_io_completion_time(ca, wbio->submit_time_us,
364                                        REQ_OP_WRITE);
365         if (ca)
366                 percpu_ref_put(&ca->ref);
367
368         if (bio->bi_error && orig)
369                 orig->bi_error = bio->bi_error;
370
371         if (wbio->bounce)
372                 bch_bio_free_pages_pool(c, bio);
373
374         if (wbio->put_bio)
375                 bio_put(bio);
376
377         if (orig)
378                 bio_endio(orig);
379         else
380                 closure_put(cl);
381 }
382
383 static void init_append_extent(struct bch_write_op *op,
384                                unsigned compressed_size,
385                                unsigned uncompressed_size,
386                                unsigned compression_type,
387                                u64 csum, unsigned csum_type,
388                                struct open_bucket *ob)
389 {
390         struct bkey_i_extent *e = bkey_extent_init(op->insert_keys.top);
391
392         op->pos.offset += uncompressed_size;
393         e->k.p = op->pos;
394         e->k.size = uncompressed_size;
395
396         bch_extent_crc_append(e, compressed_size,
397                               uncompressed_size,
398                               compression_type,
399                               csum, csum_type);
400
401         bch_alloc_sectors_append_ptrs(op->c, e, op->nr_replicas,
402                                       ob, compressed_size);
403
404         bkey_extent_set_cached(&e->k, (op->flags & BCH_WRITE_CACHED));
405         bch_keylist_push(&op->insert_keys);
406 }
407
408 static int bch_write_extent(struct bch_write_op *op,
409                             struct open_bucket *ob,
410                             struct bio *orig)
411 {
412         struct cache_set *c = op->c;
413         struct bio *bio;
414         struct bch_write_bio *wbio;
415         unsigned key_to_write_offset = op->insert_keys.top_p -
416                 op->insert_keys.keys_p;
417         struct bkey_i *key_to_write;
418         unsigned csum_type = c->opts.data_checksum;
419         unsigned compression_type = op->compression_type;
420         int ret;
421
422         /* don't refetch csum type/compression type */
423         barrier();
424
425         /* Need to decompress data? */
426         if ((op->flags & BCH_WRITE_DATA_COMPRESSED) &&
427             (op->crc.uncompressed_size != op->size ||
428              op->crc.compressed_size > ob->sectors_free)) {
429                 int ret;
430
431                 ret = bch_bio_uncompress_inplace(c, orig, op->size, op->crc);
432                 if (ret)
433                         return ret;
434
435                 op->flags &= ~BCH_WRITE_DATA_COMPRESSED;
436         }
437
438         if (op->flags & BCH_WRITE_DATA_COMPRESSED) {
439                 init_append_extent(op,
440                                    op->crc.compressed_size,
441                                    op->crc.uncompressed_size,
442                                    op->crc.compression_type,
443                                    op->crc.csum,
444                                    op->crc.csum_type,
445                                    ob);
446
447                 bio                     = orig;
448                 wbio                    = to_wbio(bio);
449                 wbio->orig              = NULL;
450                 wbio->bounce            = false;
451                 wbio->put_bio           = false;
452                 ret                     = 0;
453         } else if (csum_type != BCH_CSUM_NONE ||
454                    compression_type != BCH_COMPRESSION_NONE) {
455                 /* all units here in bytes */
456                 unsigned total_output = 0, output_available =
457                         min(ob->sectors_free << 9, orig->bi_iter.bi_size);
458                 u64 csum;
459
460                 bio = bio_alloc_bioset(GFP_NOIO,
461                                        DIV_ROUND_UP(output_available, PAGE_SIZE),
462                                        &c->bio_write);
463                 /*
464                  * XXX: can't use mempool for more than
465                  * BCH_COMPRESSED_EXTENT_MAX worth of pages
466                  */
467                 bch_bio_alloc_pages_pool(c, bio, output_available);
468
469                 /* copy WRITE_SYNC flag */
470                 bio->bi_opf             = orig->bi_opf;
471                 wbio                    = to_wbio(bio);
472                 wbio->orig              = NULL;
473                 wbio->bounce            = true;
474                 wbio->put_bio           = true;
475
476                 do {
477                         unsigned fragment_compression_type = compression_type;
478                         size_t dst_len, src_len;
479
480                         bch_bio_compress(c, bio, &dst_len,
481                                          orig, &src_len,
482                                          &fragment_compression_type);
483
484                         BUG_ON(!dst_len || dst_len > bio->bi_iter.bi_size);
485                         BUG_ON(!src_len || src_len > orig->bi_iter.bi_size);
486                         BUG_ON(dst_len & (block_bytes(c) - 1));
487                         BUG_ON(src_len & (block_bytes(c) - 1));
488
489                         swap(bio->bi_iter.bi_size, dst_len);
490                         csum = bch_checksum_bio(bio, csum_type);
491                         swap(bio->bi_iter.bi_size, dst_len);
492
493                         init_append_extent(op,
494                                            dst_len >> 9, src_len >> 9,
495                                            fragment_compression_type,
496                                            csum, csum_type, ob);
497
498                         total_output += dst_len;
499                         bio_advance(bio, dst_len);
500                         bio_advance(orig, src_len);
501                 } while (bio->bi_iter.bi_size &&
502                          orig->bi_iter.bi_size &&
503                          !bch_keylist_realloc(&op->insert_keys,
504                                               op->inline_keys,
505                                               ARRAY_SIZE(op->inline_keys),
506                                               BKEY_EXTENT_U64s_MAX));
507
508                 BUG_ON(total_output > output_available);
509
510                 memset(&bio->bi_iter, 0, sizeof(bio->bi_iter));
511                 bio->bi_iter.bi_size = total_output;
512
513                 /*
514                  * Free unneeded pages after compressing:
515                  */
516                 while (bio->bi_vcnt * PAGE_SIZE >
517                        round_up(bio->bi_iter.bi_size, PAGE_SIZE))
518                         mempool_free(bio->bi_io_vec[--bio->bi_vcnt].bv_page,
519                                      &c->bio_bounce_pages);
520
521                 ret = orig->bi_iter.bi_size != 0;
522         } else {
523                 bio = bio_next_split(orig, ob->sectors_free, GFP_NOIO,
524                                      &c->bio_write);
525
526                 wbio                    = to_wbio(bio);
527                 wbio->orig              = NULL;
528                 wbio->bounce            = false;
529                 wbio->put_bio           = bio != orig;
530
531                 init_append_extent(op, bio_sectors(bio), bio_sectors(bio),
532                                    compression_type, 0, csum_type, ob);
533
534                 ret = bio != orig;
535         }
536
537         bio->bi_end_io  = bch_write_endio;
538         bio->bi_private = &op->cl;
539         bio_set_op_attrs(bio, REQ_OP_WRITE, 0);
540
541         closure_get(bio->bi_private);
542
543         /* might have done a realloc... */
544
545         key_to_write = (void *) (op->insert_keys.keys_p + key_to_write_offset);
546
547         if (!(op->flags & BCH_WRITE_CACHED))
548                 bch_check_mark_super(c, key_to_write, false);
549
550 #ifndef CONFIG_BCACHE_NO_IO
551         bch_submit_wbio_replicas(to_wbio(bio), c, key_to_write, false);
552 #else
553         to_wbio(bio)->ca = NULL;
554         bio_endio(bio);
555 #endif
556         return ret;
557 }
558
559 static void __bch_write(struct closure *cl)
560 {
561         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
562         struct cache_set *c = op->c;
563         struct bio *bio = &op->bio->bio;
564         unsigned open_bucket_nr = 0;
565         struct open_bucket *b;
566         int ret;
567
568         memset(op->open_buckets, 0, sizeof(op->open_buckets));
569
570         if (op->flags & BCH_WRITE_DISCARD) {
571                 op->flags |= BCH_WRITE_DONE;
572                 bch_write_discard(cl);
573                 bio_put(bio);
574                 continue_at(cl, bch_write_done, index_update_wq(op));
575         }
576
577         /*
578          * Journal writes are marked REQ_PREFLUSH; if the original write was a
579          * flush, it'll wait on the journal write.
580          */
581         bio->bi_opf &= ~(REQ_PREFLUSH|REQ_FUA);
582
583         do {
584                 EBUG_ON(bio->bi_iter.bi_sector != op->pos.offset);
585                 EBUG_ON(!bio_sectors(bio));
586
587                 if (open_bucket_nr == ARRAY_SIZE(op->open_buckets))
588                         continue_at(cl, bch_write_index, index_update_wq(op));
589
590                 /* for the device pointers and 1 for the chksum */
591                 if (bch_keylist_realloc(&op->insert_keys,
592                                         op->inline_keys,
593                                         ARRAY_SIZE(op->inline_keys),
594                                         BKEY_EXTENT_U64s_MAX))
595                         continue_at(cl, bch_write_index, index_update_wq(op));
596
597                 b = bch_alloc_sectors_start(c, op->wp, op->nr_replicas,
598                         op->alloc_reserve,
599                         (op->flags & BCH_WRITE_ALLOC_NOWAIT) ? NULL : cl);
600                 EBUG_ON(!b);
601
602                 if (unlikely(IS_ERR(b))) {
603                         if (unlikely(PTR_ERR(b) != -EAGAIN)) {
604                                 ret = PTR_ERR(b);
605                                 goto err;
606                         }
607
608                         /*
609                          * If we already have some keys, must insert them first
610                          * before allocating another open bucket. We only hit
611                          * this case if open_bucket_nr > 1.
612                          */
613                         if (!bch_keylist_empty(&op->insert_keys))
614                                 continue_at(cl, bch_write_index,
615                                             index_update_wq(op));
616
617                         /*
618                          * If we've looped, we're running out of a workqueue -
619                          * not the bch_write() caller's context - and we don't
620                          * want to block the workqueue:
621                          */
622                         if (op->flags & BCH_WRITE_LOOPED)
623                                 continue_at(cl, __bch_write, op->io_wq);
624
625                         /*
626                          * Otherwise, we do want to block the caller on alloc
627                          * failure instead of letting it queue up more and more
628                          * writes:
629                          * XXX: this technically needs a try_to_freeze() -
630                          * except that that's not safe because caller may have
631                          * issued other IO... hmm..
632                          */
633                         closure_sync(cl);
634                         continue;
635                 }
636
637                 BUG_ON(b - c->open_buckets == 0 ||
638                        b - c->open_buckets > U8_MAX);
639                 op->open_buckets[open_bucket_nr++] = b - c->open_buckets;
640
641                 ret = bch_write_extent(op, b, bio);
642
643                 bch_alloc_sectors_done(c, op->wp, b);
644
645                 if (ret < 0)
646                         goto err;
647         } while (ret);
648
649         op->flags |= BCH_WRITE_DONE;
650         continue_at(cl, bch_write_index, index_update_wq(op));
651 err:
652         if (op->flags & BCH_WRITE_DISCARD_ON_ERROR) {
653                 /*
654                  * If we were writing cached data, not doing the write is fine
655                  * so long as we discard whatever would have been overwritten -
656                  * then it's equivalent to doing the write and immediately
657                  * reclaiming it.
658                  */
659
660                 bch_write_discard(cl);
661         } else {
662                 /*
663                  * Right now we can only error here if we went RO - the
664                  * allocation failed, but we already checked for -ENOSPC when we
665                  * got our reservation.
666                  *
667                  * XXX capacity might have changed, but we don't check for that
668                  * yet:
669                  */
670                 op->error = ret;
671         }
672
673         op->flags |= BCH_WRITE_DONE;
674
675         /*
676          * No reason not to insert keys for whatever data was successfully
677          * written (especially for a cmpxchg operation that's moving data
678          * around)
679          */
680         continue_at(cl, !bch_keylist_empty(&op->insert_keys)
681                     ? bch_write_index
682                     : bch_write_done, index_update_wq(op));
683 }
684
685 void bch_wake_delayed_writes(unsigned long data)
686 {
687         struct cache_set *c = (void *) data;
688         struct bch_write_op *op;
689         unsigned long flags;
690
691         spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
692
693         while ((op = c->write_wait_head)) {
694                 if (!test_bit(CACHE_SET_RO, &c->flags) &&
695                     !test_bit(CACHE_SET_STOPPING, &c->flags) &&
696                     time_after(op->expires, jiffies)) {
697                         mod_timer(&c->foreground_write_wakeup, op->expires);
698                         break;
699                 }
700
701                 c->write_wait_head = op->next;
702                 if (!c->write_wait_head)
703                         c->write_wait_tail = NULL;
704
705                 closure_put(&op->cl);
706         }
707
708         spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
709 }
710
711 /**
712  * bch_write - handle a write to a cache device or flash only volume
713  *
714  * This is the starting point for any data to end up in a cache device; it could
715  * be from a normal write, or a writeback write, or a write to a flash only
716  * volume - it's also used by the moving garbage collector to compact data in
717  * mostly empty buckets.
718  *
719  * It first writes the data to the cache, creating a list of keys to be inserted
720  * (if the data won't fit in a single open bucket, there will be multiple keys);
721  * after the data is written it calls bch_journal, and after the keys have been
722  * added to the next journal write they're inserted into the btree.
723  *
724  * It inserts the data in op->bio; bi_sector is used for the key offset, and
725  * op->inode is used for the key inode.
726  *
727  * If op->discard is true, instead of inserting the data it invalidates the
728  * region of the cache represented by op->bio and op->inode.
729  */
730 void bch_write(struct closure *cl)
731 {
732         struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
733         struct bio *bio = &op->bio->bio;
734         struct cache_set *c = op->c;
735         u64 inode = op->pos.inode;
736
737         trace_bcache_write(c, inode, bio,
738                            !(op->flags & BCH_WRITE_CACHED),
739                            op->flags & BCH_WRITE_DISCARD);
740
741         if (!percpu_ref_tryget(&c->writes)) {
742                 __bcache_io_error(c, "read only");
743                 op->error = -EROFS;
744                 bch_disk_reservation_put(c, &op->res);
745                 closure_return(cl);
746         }
747
748         if (!(op->flags & BCH_WRITE_DISCARD))
749                 bch_increment_clock(c, bio_sectors(bio), WRITE);
750
751         if (!(op->flags & BCH_WRITE_DISCARD))
752                 bch_mark_foreground_write(c, bio_sectors(bio));
753         else
754                 bch_mark_discard(c, bio_sectors(bio));
755
756         /* Don't call bch_next_delay() if rate is >= 1 GB/sec */
757
758         if (c->foreground_write_ratelimit_enabled &&
759             c->foreground_write_pd.rate.rate < (1 << 30) &&
760             !(op->flags & BCH_WRITE_DISCARD) && op->wp->throttle) {
761                 unsigned long flags;
762                 u64 delay;
763
764                 spin_lock_irqsave(&c->foreground_write_pd_lock, flags);
765                 bch_ratelimit_increment(&c->foreground_write_pd.rate,
766                                         bio->bi_iter.bi_size);
767
768                 delay = bch_ratelimit_delay(&c->foreground_write_pd.rate);
769
770                 if (delay >= HZ / 100) {
771                         trace_bcache_write_throttle(c, inode, bio, delay);
772
773                         closure_get(&op->cl); /* list takes a ref */
774
775                         op->expires = jiffies + delay;
776                         op->next = NULL;
777
778                         if (c->write_wait_tail)
779                                 c->write_wait_tail->next = op;
780                         else
781                                 c->write_wait_head = op;
782                         c->write_wait_tail = op;
783
784                         if (!timer_pending(&c->foreground_write_wakeup))
785                                 mod_timer(&c->foreground_write_wakeup,
786                                           op->expires);
787
788                         spin_unlock_irqrestore(&c->foreground_write_pd_lock,
789                                                flags);
790                         continue_at(cl, __bch_write, index_update_wq(op));
791                 }
792
793                 spin_unlock_irqrestore(&c->foreground_write_pd_lock, flags);
794         }
795
796         continue_at_nobarrier(cl, __bch_write, NULL);
797 }
798
799 void bch_write_op_init(struct bch_write_op *op, struct cache_set *c,
800                        struct bch_write_bio *bio, struct disk_reservation res,
801                        struct write_point *wp, struct bpos pos,
802                        u64 *journal_seq, unsigned flags)
803 {
804         op->c           = c;
805         op->io_wq       = index_update_wq(op);
806         op->bio         = bio;
807         op->written     = 0;
808         op->error       = 0;
809         op->flags       = flags;
810         op->compression_type = c->opts.compression;
811         op->nr_replicas = res.nr_replicas;
812         op->alloc_reserve = RESERVE_NONE;
813         op->pos         = pos;
814         op->version     = 0;
815         op->res         = res;
816         op->wp          = wp;
817
818         if (journal_seq) {
819                 op->journal_seq_p = journal_seq;
820                 op->flags |= BCH_WRITE_JOURNAL_SEQ_PTR;
821         } else {
822                 op->journal_seq = 0;
823         }
824
825         op->index_update_fn = bch_write_index_default;
826
827         bch_keylist_init(&op->insert_keys,
828                          op->inline_keys,
829                          ARRAY_SIZE(op->inline_keys));
830
831         if (version_stress_test(c))
832                 get_random_bytes(&op->version, sizeof(op->version));
833 }
834
835 /* Discard */
836
837 /* bch_discard - discard a range of keys from start_key to end_key.
838  * @c           cache set
839  * @start_key   pointer to start location
840  *              NOTE: discard starts at bkey_start_offset(start_key)
841  * @end_key     pointer to end location
842  *              NOTE: discard ends at KEY_OFFSET(end_key)
843  * @version     version of discard (0ULL if none)
844  *
845  * Returns:
846  *       0 on success
847  *      <0 on error
848  *
849  * XXX: this needs to be refactored with inode_truncate, or more
850  *      appropriately inode_truncate should call this
851  */
852 int bch_discard(struct cache_set *c, struct bpos start,
853                 struct bpos end, u64 version,
854                 struct disk_reservation *disk_res,
855                 struct extent_insert_hook *hook,
856                 u64 *journal_seq)
857 {
858         return bch_btree_delete_range(c, BTREE_ID_EXTENTS, start, end, version,
859                                       disk_res, hook, journal_seq);
860 }
861
862 /* Cache promotion on read */
863
864 struct cache_promote_op {
865         struct closure          cl;
866         struct migrate_write    write;
867         struct bio_vec          bi_inline_vecs[0]; /* must be last */
868 };
869
870 /* Read */
871
872 static int bio_checksum_uncompress(struct cache_set *c,
873                                    struct bch_read_bio *rbio)
874 {
875         struct bio *src = &rbio->bio;
876         struct bio *dst = &bch_rbio_parent(rbio)->bio;
877         struct bvec_iter dst_iter = rbio->parent_iter;
878         u64 csum;
879         int ret = 0;
880
881         /*
882          * reset iterator for checksumming and copying bounced data: here we've
883          * set rbio->compressed_size to the amount of data we actually read,
884          * which was not necessarily the full extent if we were only bouncing
885          * in order to promote
886          */
887         if (rbio->bounce) {
888                 src->bi_iter.bi_size            = rbio->crc.compressed_size << 9;
889                 src->bi_iter.bi_idx             = 0;
890                 src->bi_iter.bi_bvec_done       = 0;
891         } else {
892                 src->bi_iter = rbio->parent_iter;
893         }
894
895         csum = bch_checksum_bio(src, rbio->crc.csum_type);
896         if (cache_nonfatal_io_err_on(rbio->crc.csum != csum, rbio->ca,
897                         "data checksum error, inode %llu offset %llu: expected %0llx got %0llx (type %u)",
898                         rbio->inode, (u64) rbio->parent_iter.bi_sector << 9,
899                         rbio->crc.csum, csum, rbio->crc.csum_type))
900                 ret = -EIO;
901
902         /*
903          * If there was a checksum error, still copy the data back - unless it
904          * was compressed, we don't want to decompress bad data:
905          */
906         if (rbio->crc.compression_type != BCH_COMPRESSION_NONE) {
907                 if (!ret) {
908                         ret = bch_bio_uncompress(c, src, dst,
909                                                  dst_iter, rbio->crc);
910                         if (ret)
911                                 __bcache_io_error(c, "decompression error");
912                 }
913         } else if (rbio->bounce) {
914                 bio_advance(src, rbio->crc.offset << 9);
915                 bio_copy_data_iter(dst, dst_iter,
916                                    src, src->bi_iter);
917         }
918
919         return ret;
920 }
921
922 static void bch_rbio_free(struct cache_set *c, struct bch_read_bio *rbio)
923 {
924         struct bio *bio = &rbio->bio;
925
926         BUG_ON(rbio->ca);
927         BUG_ON(!rbio->split);
928
929         if (rbio->promote)
930                 kfree(rbio->promote);
931         if (rbio->bounce)
932                 bch_bio_free_pages_pool(c, bio);
933
934         bio_put(bio);
935 }
936
937 static void bch_rbio_done(struct cache_set *c, struct bch_read_bio *rbio)
938 {
939         struct bio *orig = &bch_rbio_parent(rbio)->bio;
940
941         percpu_ref_put(&rbio->ca->ref);
942         rbio->ca = NULL;
943
944         if (rbio->split) {
945                 if (rbio->bio.bi_error)
946                         orig->bi_error = rbio->bio.bi_error;
947
948                 bio_endio(orig);
949                 bch_rbio_free(c, rbio);
950         } else {
951                 if (rbio->promote)
952                         kfree(rbio->promote);
953
954                 orig->bi_end_io = rbio->orig_bi_end_io;
955                 bio_endio_nodec(orig);
956         }
957 }
958
959 /*
960  * Decide if we want to retry the read - returns true if read is being retried,
961  * false if caller should pass error on up
962  */
963 static void bch_read_error_maybe_retry(struct cache_set *c,
964                                        struct bch_read_bio *rbio,
965                                        int error)
966 {
967         unsigned long flags;
968
969         if ((error == -EINTR) &&
970             (rbio->flags & BCH_READ_RETRY_IF_STALE)) {
971                 atomic_long_inc(&c->cache_read_races);
972                 goto retry;
973         }
974
975         if (error == -EIO) {
976                 /* io error - do we have another replica? */
977         }
978
979         bch_rbio_parent(rbio)->bio.bi_error = error;
980         bch_rbio_done(c, rbio);
981         return;
982 retry:
983         percpu_ref_put(&rbio->ca->ref);
984         rbio->ca = NULL;
985
986         spin_lock_irqsave(&c->read_retry_lock, flags);
987         bio_list_add(&c->read_retry_list, &rbio->bio);
988         spin_unlock_irqrestore(&c->read_retry_lock, flags);
989         queue_work(c->wq, &c->read_retry_work);
990 }
991
992 static void cache_promote_done(struct closure *cl)
993 {
994         struct cache_promote_op *op =
995                 container_of(cl, struct cache_promote_op, cl);
996
997         bch_bio_free_pages_pool(op->write.op.c, &op->write.wbio.bio);
998         kfree(op);
999 }
1000
1001 /* Inner part that may run in process context */
1002 static void __bch_read_endio(struct cache_set *c, struct bch_read_bio *rbio)
1003 {
1004         int ret;
1005
1006         ret = bio_checksum_uncompress(c, rbio);
1007         if (ret) {
1008                 bch_read_error_maybe_retry(c, rbio, ret);
1009                 return;
1010         }
1011
1012         if (rbio->promote &&
1013             !test_bit(CACHE_SET_RO, &c->flags) &&
1014             !test_bit(CACHE_SET_STOPPING, &c->flags)) {
1015                 struct cache_promote_op *promote = rbio->promote;
1016                 struct closure *cl = &promote->cl;
1017
1018                 BUG_ON(!rbio->split || !rbio->bounce);
1019
1020                 /* we now own pages: */
1021                 swap(promote->write.wbio.bio.bi_vcnt, rbio->bio.bi_vcnt);
1022                 rbio->promote = NULL;
1023
1024                 bch_rbio_done(c, rbio);
1025
1026                 closure_init(cl, &c->cl);
1027                 closure_call(&promote->write.op.cl, bch_write, c->wq, cl);
1028                 closure_return_with_destructor(cl, cache_promote_done);
1029         } else {
1030                 bch_rbio_done(c, rbio);
1031         }
1032 }
1033
1034 void bch_bio_decompress_work(struct work_struct *work)
1035 {
1036         struct bio_decompress_worker *d =
1037                 container_of(work, struct bio_decompress_worker, work);
1038         struct llist_node *list, *next;
1039         struct bch_read_bio *rbio;
1040
1041         while ((list = llist_del_all(&d->bio_list)))
1042                 for (list = llist_reverse_order(list);
1043                      list;
1044                      list = next) {
1045                         next = llist_next(list);
1046                         rbio = container_of(list, struct bch_read_bio, list);
1047
1048                         __bch_read_endio(d->c, rbio);
1049                 }
1050 }
1051
1052 static void bch_read_endio(struct bio *bio)
1053 {
1054         struct bch_read_bio *rbio =
1055                 container_of(bio, struct bch_read_bio, bio);
1056         struct cache_set *c = rbio->ca->set;
1057         int stale = ((rbio->flags & BCH_READ_RETRY_IF_STALE) && race_fault()) ||
1058                 ptr_stale(rbio->ca, &rbio->ptr) ? -EINTR : 0;
1059         int error = bio->bi_error ?: stale;
1060
1061         bch_account_io_completion_time(rbio->ca, rbio->submit_time_us, REQ_OP_READ);
1062
1063         cache_nonfatal_io_err_on(bio->bi_error, rbio->ca, "data read");
1064
1065         if (error) {
1066                 bch_read_error_maybe_retry(c, rbio, error);
1067                 return;
1068         }
1069
1070         if (rbio->crc.compression_type != BCH_COMPRESSION_NONE) {
1071                 struct bio_decompress_worker *d;
1072
1073                 preempt_disable();
1074                 d = this_cpu_ptr(c->bio_decompress_worker);
1075                 llist_add(&rbio->list, &d->bio_list);
1076                 queue_work(system_unbound_wq, &d->work);
1077                 preempt_enable();
1078         } else {
1079                 __bch_read_endio(c, rbio);
1080         }
1081 }
1082
1083 void bch_read_extent_iter(struct cache_set *c, struct bch_read_bio *orig,
1084                           struct bvec_iter iter, struct bkey_s_c k,
1085                           struct extent_pick_ptr *pick, unsigned flags)
1086 {
1087         struct bch_read_bio *rbio;
1088         struct cache_promote_op *promote_op = NULL;
1089         unsigned skip = iter.bi_sector - bkey_start_offset(k.k);
1090         bool bounce = false, split, read_full = false;
1091
1092         EBUG_ON(bkey_start_offset(k.k) > iter.bi_sector ||
1093                 k.k->p.offset < bvec_iter_end_sector(iter));
1094
1095         /* only promote if we're not reading from the fastest tier: */
1096
1097         /*
1098          * XXX: multiple promotes can race with each other, wastefully. Keep a
1099          * list of outstanding promotes?
1100          */
1101         if ((flags & BCH_READ_PROMOTE) && pick->ca->mi.tier) {
1102                 /*
1103                  * biovec needs to be big enough to hold decompressed data, if
1104                  * the bch_write_extent() has to decompress/recompress it:
1105                  */
1106                 unsigned sectors =
1107                         max_t(unsigned, k.k->size,
1108                               pick->crc.uncompressed_size);
1109                 unsigned pages = DIV_ROUND_UP(sectors, PAGE_SECTORS);
1110
1111                 promote_op = kmalloc(sizeof(*promote_op) +
1112                                 sizeof(struct bio_vec) * pages, GFP_NOIO);
1113                 if (promote_op) {
1114                         struct bio *promote_bio = &promote_op->write.wbio.bio;
1115
1116                         bio_init(promote_bio);
1117                         promote_bio->bi_max_vecs = pages;
1118                         promote_bio->bi_io_vec  = promote_bio->bi_inline_vecs;
1119                         bounce = true;
1120                         /* could also set read_full */
1121                 }
1122         }
1123
1124         /*
1125          * note: if compression_type and crc_type both == none, then
1126          * compressed/uncompressed size is zero
1127          */
1128         if (pick->crc.compression_type != BCH_COMPRESSION_NONE ||
1129             (pick->crc.csum_type != BCH_CSUM_NONE &&
1130              (bvec_iter_sectors(iter) != pick->crc.uncompressed_size ||
1131               (flags & BCH_READ_FORCE_BOUNCE)))) {
1132                 read_full = true;
1133                 bounce = true;
1134         }
1135
1136         if (bounce) {
1137                 unsigned sectors = read_full
1138                         ? (pick->crc.compressed_size ?: k.k->size)
1139                         : bvec_iter_sectors(iter);
1140
1141                 rbio = container_of(bio_alloc_bioset(GFP_NOIO,
1142                                         DIV_ROUND_UP(sectors, PAGE_SECTORS),
1143                                         &c->bio_read_split),
1144                                     struct bch_read_bio, bio);
1145
1146                 bch_bio_alloc_pages_pool(c, &rbio->bio, sectors << 9);
1147                 split = true;
1148         } else if (!(flags & BCH_READ_MAY_REUSE_BIO) ||
1149                    !(flags & BCH_READ_IS_LAST)) {
1150                 /*
1151                  * Have to clone if there were any splits, due to error
1152                  * reporting issues (if a split errored, and retrying didn't
1153                  * work, when it reports the error to its parent (us) we don't
1154                  * know if the error was from our bio, and we should retry, or
1155                  * from the whole bio, in which case we don't want to retry and
1156                  * lose the error)
1157                  */
1158                 rbio = container_of(bio_clone_fast(&orig->bio,
1159                                         GFP_NOIO, &c->bio_read_split),
1160                                     struct bch_read_bio, bio);
1161                 rbio->bio.bi_iter = iter;
1162                 split = true;
1163         } else {
1164                 rbio = orig;
1165                 rbio->bio.bi_iter = iter;
1166                 split = false;
1167                 BUG_ON(bio_flagged(&rbio->bio, BIO_CHAIN));
1168         }
1169
1170         if (!(flags & BCH_READ_IS_LAST))
1171                 __bio_inc_remaining(&orig->bio);
1172
1173         if (split)
1174                 rbio->parent    = orig;
1175         else
1176                 rbio->orig_bi_end_io = orig->bio.bi_end_io;
1177         rbio->parent_iter       = iter;
1178
1179         rbio->inode             = k.k->p.inode;
1180         rbio->flags             = flags;
1181         rbio->bounce            = bounce;
1182         rbio->split             = split;
1183         rbio->crc               = pick->crc;
1184         /*
1185          * crc.compressed_size will be 0 if there wasn't any checksum
1186          * information, also we need to stash the original size of the bio if we
1187          * bounced (which isn't necessarily the original key size, if we bounced
1188          * only for promoting)
1189          */
1190         rbio->crc.compressed_size = bio_sectors(&rbio->bio);
1191         rbio->ptr               = pick->ptr;
1192         rbio->ca                = pick->ca;
1193         rbio->promote           = promote_op;
1194
1195         rbio->bio.bi_bdev       = pick->ca->disk_sb.bdev;
1196         rbio->bio.bi_opf        = orig->bio.bi_opf;
1197         rbio->bio.bi_iter.bi_sector = pick->ptr.offset;
1198         rbio->bio.bi_end_io     = bch_read_endio;
1199
1200         if (promote_op) {
1201                 struct bio *promote_bio = &promote_op->write.wbio.bio;
1202
1203                 promote_bio->bi_iter = rbio->bio.bi_iter;
1204                 memcpy(promote_bio->bi_io_vec, rbio->bio.bi_io_vec,
1205                        sizeof(struct bio_vec) * rbio->bio.bi_vcnt);
1206
1207                 bch_migrate_write_init(c, &promote_op->write,
1208                                        &c->promote_write_point,
1209                                        k, NULL,
1210                                        BCH_WRITE_ALLOC_NOWAIT);
1211                 promote_op->write.promote = true;
1212
1213                 if (rbio->crc.compression_type) {
1214                         promote_op->write.op.flags |= BCH_WRITE_DATA_COMPRESSED;
1215                         promote_op->write.op.crc = rbio->crc;
1216                         promote_op->write.op.size = k.k->size;
1217                 } else if (read_full) {
1218                         /*
1219                          * Adjust bio to correspond to _live_ portion of @k -
1220                          * which might be less than what we're actually reading:
1221                          */
1222                         bio_advance(promote_bio, rbio->crc.offset << 9);
1223                         BUG_ON(bio_sectors(promote_bio) < k.k->size);
1224                         promote_bio->bi_iter.bi_size = k.k->size << 9;
1225                 } else {
1226                         /*
1227                          * Set insert pos to correspond to what we're actually
1228                          * reading:
1229                          */
1230                         promote_op->write.op.pos.offset = iter.bi_sector;
1231                 }
1232
1233                 promote_bio->bi_iter.bi_sector =
1234                         promote_op->write.op.pos.offset;
1235         }
1236
1237         /* _after_ promete stuff has looked at rbio->crc.offset */
1238         if (read_full)
1239                 rbio->crc.offset += skip;
1240         else
1241                 rbio->bio.bi_iter.bi_sector += skip;
1242
1243         rbio->submit_time_us = local_clock_us();
1244
1245 #ifndef CONFIG_BCACHE_NO_IO
1246         generic_make_request(&rbio->bio);
1247 #else
1248         bio_endio(&rbio->bio);
1249 #endif
1250 }
1251
1252 static void bch_read_iter(struct cache_set *c, struct bch_read_bio *rbio,
1253                           struct bvec_iter bvec_iter, u64 inode,
1254                           unsigned flags)
1255 {
1256         struct bio *bio = &rbio->bio;
1257         struct btree_iter iter;
1258         struct bkey_s_c k;
1259         int ret;
1260
1261         for_each_btree_key_with_holes(&iter, c, BTREE_ID_EXTENTS,
1262                                       POS(inode, bvec_iter.bi_sector), k) {
1263                 BKEY_PADDED(k) tmp;
1264                 struct extent_pick_ptr pick;
1265                 unsigned bytes, sectors;
1266                 bool is_last;
1267
1268                 /*
1269                  * Unlock the iterator while the btree node's lock is still in
1270                  * cache, before doing the IO:
1271                  */
1272                 bkey_reassemble(&tmp.k, k);
1273                 k = bkey_i_to_s_c(&tmp.k);
1274                 bch_btree_iter_unlock(&iter);
1275
1276                 bch_extent_pick_ptr(c, k, &pick);
1277                 if (IS_ERR(pick.ca)) {
1278                         bcache_io_error(c, bio, "no device to read from");
1279                         bio_endio(bio);
1280                         return;
1281                 }
1282
1283                 sectors = min_t(u64, k.k->p.offset,
1284                                 bvec_iter_end_sector(bvec_iter)) -
1285                         bvec_iter.bi_sector;
1286                 bytes = sectors << 9;
1287                 is_last = bytes == bvec_iter.bi_size;
1288                 swap(bvec_iter.bi_size, bytes);
1289
1290                 if (is_last)
1291                         flags |= BCH_READ_IS_LAST;
1292
1293                 if (pick.ca) {
1294                         PTR_BUCKET(pick.ca, &pick.ptr)->read_prio =
1295                                 c->prio_clock[READ].hand;
1296
1297                         bch_read_extent_iter(c, rbio, bvec_iter,
1298                                              k, &pick, flags);
1299
1300                         flags &= ~BCH_READ_MAY_REUSE_BIO;
1301                 } else {
1302                         zero_fill_bio_iter(bio, bvec_iter);
1303
1304                         if (is_last)
1305                                 bio_endio(bio);
1306                 }
1307
1308                 if (is_last)
1309                         return;
1310
1311                 swap(bvec_iter.bi_size, bytes);
1312                 bio_advance_iter(bio, &bvec_iter, bytes);
1313         }
1314
1315         /*
1316          * If we get here, it better have been because there was an error
1317          * reading a btree node
1318          */
1319         ret = bch_btree_iter_unlock(&iter);
1320         BUG_ON(!ret);
1321         bcache_io_error(c, bio, "btree IO error %i", ret);
1322         bio_endio(bio);
1323 }
1324
1325 void bch_read(struct cache_set *c, struct bch_read_bio *bio, u64 inode)
1326 {
1327         bch_increment_clock(c, bio_sectors(&bio->bio), READ);
1328
1329         bch_read_iter(c, bio, bio->bio.bi_iter, inode,
1330                       BCH_READ_FORCE_BOUNCE|
1331                       BCH_READ_RETRY_IF_STALE|
1332                       BCH_READ_PROMOTE|
1333                       BCH_READ_MAY_REUSE_BIO);
1334 }
1335 EXPORT_SYMBOL(bch_read);
1336
1337 /**
1338  * bch_read_retry - re-submit a bio originally from bch_read()
1339  */
1340 static void bch_read_retry(struct cache_set *c, struct bch_read_bio *rbio)
1341 {
1342         struct bch_read_bio *parent = bch_rbio_parent(rbio);
1343         struct bvec_iter iter = rbio->parent_iter;
1344         u64 inode = rbio->inode;
1345
1346         trace_bcache_read_retry(&rbio->bio);
1347
1348         if (rbio->split)
1349                 bch_rbio_free(c, rbio);
1350         else
1351                 rbio->bio.bi_end_io = rbio->orig_bi_end_io;
1352
1353         bch_read_iter(c, parent, iter, inode,
1354                       BCH_READ_FORCE_BOUNCE|
1355                       BCH_READ_RETRY_IF_STALE|
1356                       BCH_READ_PROMOTE);
1357 }
1358
1359 void bch_read_retry_work(struct work_struct *work)
1360 {
1361         struct cache_set *c = container_of(work, struct cache_set,
1362                                            read_retry_work);
1363         struct bch_read_bio *rbio;
1364         struct bio *bio;
1365         unsigned long flags;
1366
1367         while (1) {
1368                 spin_lock_irqsave(&c->read_retry_lock, flags);
1369                 bio = bio_list_pop(&c->read_retry_list);
1370                 spin_unlock_irqrestore(&c->read_retry_lock, flags);
1371
1372                 if (!bio)
1373                         break;
1374
1375                 rbio = container_of(bio, struct bch_read_bio, bio);
1376                 bch_read_retry(c, rbio);
1377         }
1378 }