]> git.sesse.net Git - bcachefs-tools-debian/blobdiff - libbcachefs/move.c
Update bcachefs sources to 15f6e66e86 bcachefs: pass around bset_tree less
[bcachefs-tools-debian] / libbcachefs / move.c
index a176484ae91d00d8c01936fe1169a91fe3fbb5b7..4a5e435bfe4b1bffee773c1dc1005d449a6ac403 100644 (file)
@@ -5,7 +5,9 @@
 #include "buckets.h"
 #include "inode.h"
 #include "io.h"
+#include "journal_reclaim.h"
 #include "move.h"
+#include "replicas.h"
 #include "super-io.h"
 #include "keylist.h"
 
 
 #include <trace/events/bcachefs.h>
 
+#define SECTORS_IN_FLIGHT_PER_DEVICE   2048
+
 struct moving_io {
        struct list_head        list;
        struct closure          cl;
        bool                    read_completed;
-       unsigned                sectors;
+
+       unsigned                read_sectors;
+       unsigned                write_sectors;
 
        struct bch_read_bio     rbio;
 
@@ -34,7 +40,11 @@ struct moving_context {
        struct bch_move_stats   *stats;
 
        struct list_head        reads;
-       atomic_t                sectors_in_flight;
+
+       /* in flight sectors: */
+       atomic_t                read_sectors;
+       atomic_t                write_sectors;
+
        wait_queue_head_t       wait;
 };
 
@@ -58,8 +68,8 @@ static int bch2_migrate_index_update(struct bch_write_op *op)
                BKEY_PADDED(k) _new, _insert;
                struct bch_extent_ptr *ptr;
                struct bch_extent_crc_unpacked crc;
-               unsigned nr_dirty;
                bool did_work = false;
+               int nr;
 
                if (btree_iter_err(k)) {
                        ret = bch2_btree_iter_unlock(&iter);
@@ -116,29 +126,29 @@ static int bch2_migrate_index_update(struct bch_write_op *op)
                                (struct bch_extent_crc_unpacked) { 0 });
                bch2_extent_normalize(c, extent_i_to_s(insert).s);
                bch2_extent_mark_replicas_cached(c, extent_i_to_s(insert),
-                                                c->opts.data_replicas);
+                                                op->opts.background_target,
+                                                op->opts.data_replicas);
 
                /*
                 * It's possible we race, and for whatever reason the extent now
                 * has fewer replicas than when we last looked at it - meaning
                 * we need to get a disk reservation here:
                 */
-               nr_dirty = bch2_extent_nr_dirty_ptrs(bkey_i_to_s_c(&insert->k_i));
-               if (m->nr_ptrs_reserved < nr_dirty) {
-                       unsigned sectors = (nr_dirty - m->nr_ptrs_reserved) *
-                                       keylist_sectors(keys);
-
+               nr = bch2_extent_nr_dirty_ptrs(bkey_i_to_s_c(&insert->k_i)) -
+                       (bch2_extent_nr_dirty_ptrs(k) + m->nr_ptrs_reserved);
+               if (nr > 0) {
                        /*
                         * can't call bch2_disk_reservation_add() with btree
                         * locks held, at least not without a song and dance
                         */
                        bch2_btree_iter_unlock(&iter);
 
-                       ret = bch2_disk_reservation_add(c, &op->res, sectors, 0);
+                       ret = bch2_disk_reservation_add(c, &op->res,
+                                       keylist_sectors(keys) * nr, 0);
                        if (ret)
                                goto out;
 
-                       m->nr_ptrs_reserved = nr_dirty;
+                       m->nr_ptrs_reserved += nr;
                        goto next;
                }
 
@@ -148,7 +158,7 @@ static int bch2_migrate_index_update(struct bch_write_op *op)
                        break;
 
                ret = bch2_btree_insert_at(c, &op->res,
-                               NULL, op_journal_seq(op),
+                               op_journal_seq(op),
                                BTREE_INSERT_ATOMIC|
                                BTREE_INSERT_NOFAIL|
                                BTREE_INSERT_USE_RESERVE|
@@ -206,7 +216,6 @@ void bch2_migrate_read_done(struct migrate_write *m, struct bch_read_bio *rbio)
 }
 
 int bch2_migrate_write_init(struct bch_fs *c, struct migrate_write *m,
-                           struct bch_devs_mask *devs,
                            struct write_point_specifier wp,
                            struct bch_io_opts io_opts,
                            enum data_cmd data_cmd,
@@ -217,13 +226,13 @@ int bch2_migrate_write_init(struct bch_fs *c, struct migrate_write *m,
 
        m->data_cmd     = data_cmd;
        m->data_opts    = data_opts;
-       m->nr_ptrs_reserved = bch2_extent_nr_dirty_ptrs(k);
+       m->nr_ptrs_reserved = 0;
 
-       bch2_write_op_init(&m->op, c);
-       m->op.csum_type = bch2_data_checksum_type(c, io_opts.data_checksum);
+       bch2_write_op_init(&m->op, c, io_opts);
        m->op.compression_type =
-               bch2_compression_opt_to_type[io_opts.compression];
-       m->op.devs      = devs;
+               bch2_compression_opt_to_type[io_opts.background_compression ?:
+                                            io_opts.compression];
+       m->op.target    = data_opts.target,
        m->op.write_point = wp;
 
        if (m->data_opts.btree_insert_flags & BTREE_INSERT_USE_RESERVE)
@@ -240,19 +249,20 @@ int bch2_migrate_write_init(struct bch_fs *c, struct migrate_write *m,
        m->op.index_update_fn   = bch2_migrate_index_update;
 
        switch (data_cmd) {
-       case DATA_ADD_REPLICAS:
-               if (m->nr_ptrs_reserved < c->opts.data_replicas) {
-                       m->op.nr_replicas = c->opts.data_replicas - m->nr_ptrs_reserved;
+       case DATA_ADD_REPLICAS: {
+               int nr = (int) io_opts.data_replicas -
+                       bch2_extent_nr_dirty_ptrs(k);
+
+               if (nr > 0) {
+                       m->op.nr_replicas = m->nr_ptrs_reserved = nr;
 
                        ret = bch2_disk_reservation_get(c, &m->op.res,
-                                                       k.k->size,
-                                                       m->op.nr_replicas, 0);
+                                       k.k->size, m->op.nr_replicas, 0);
                        if (ret)
                                return ret;
-
-                       m->nr_ptrs_reserved = c->opts.data_replicas;
                }
                break;
+       }
        case DATA_REWRITE:
                break;
        case DATA_PROMOTE:
@@ -279,22 +289,33 @@ static void move_free(struct closure *cl)
                if (bv->bv_page)
                        __free_page(bv->bv_page);
 
-       atomic_sub(io->sectors, &ctxt->sectors_in_flight);
        wake_up(&ctxt->wait);
 
        kfree(io);
 }
 
+static void move_write_done(struct closure *cl)
+{
+       struct moving_io *io = container_of(cl, struct moving_io, cl);
+
+       atomic_sub(io->write_sectors, &io->write.ctxt->write_sectors);
+       closure_return_with_destructor(cl, move_free);
+}
+
 static void move_write(struct closure *cl)
 {
        struct moving_io *io = container_of(cl, struct moving_io, cl);
 
-       if (likely(!io->rbio.bio.bi_status)) {
-               bch2_migrate_read_done(&io->write, &io->rbio);
-               closure_call(&io->write.op.cl, bch2_write, NULL, cl);
+       if (unlikely(io->rbio.bio.bi_status || io->rbio.hole)) {
+               closure_return_with_destructor(cl, move_free);
+               return;
        }
 
-       closure_return_with_destructor(cl, move_free);
+       bch2_migrate_read_done(&io->write, &io->rbio);
+
+       atomic_add(io->write_sectors, &io->write.ctxt->write_sectors);
+       closure_call(&io->write.op.cl, bch2_write, NULL, cl);
+       continue_at(cl, move_write_done, NULL);
 }
 
 static inline struct moving_io *next_pending_write(struct moving_context *ctxt)
@@ -310,32 +331,65 @@ static void move_read_endio(struct bio *bio)
        struct moving_io *io = container_of(bio, struct moving_io, rbio.bio);
        struct moving_context *ctxt = io->write.ctxt;
 
+       atomic_sub(io->read_sectors, &ctxt->read_sectors);
        io->read_completed = true;
+
        if (next_pending_write(ctxt))
                wake_up(&ctxt->wait);
 
        closure_put(&ctxt->cl);
 }
 
+static void do_pending_writes(struct moving_context *ctxt)
+{
+       struct moving_io *io;
+
+       while ((io = next_pending_write(ctxt))) {
+               list_del(&io->list);
+               closure_call(&io->cl, move_write, NULL, &ctxt->cl);
+       }
+}
+
+#define move_ctxt_wait_event(_ctxt, _cond)                     \
+do {                                                           \
+       do_pending_writes(_ctxt);                               \
+                                                               \
+       if (_cond)                                              \
+               break;                                          \
+       __wait_event((_ctxt)->wait,                             \
+                    next_pending_write(_ctxt) || (_cond));     \
+} while (1)
+
+static void bch2_move_ctxt_wait_for_io(struct moving_context *ctxt)
+{
+       unsigned sectors_pending = atomic_read(&ctxt->write_sectors);
+
+       move_ctxt_wait_event(ctxt,
+               !atomic_read(&ctxt->write_sectors) ||
+               atomic_read(&ctxt->write_sectors) != sectors_pending);
+}
+
 static int bch2_move_extent(struct bch_fs *c,
                            struct moving_context *ctxt,
-                           struct bch_devs_mask *devs,
                            struct write_point_specifier wp,
                            struct bch_io_opts io_opts,
                            struct bkey_s_c_extent e,
                            enum data_cmd data_cmd,
                            struct data_opts data_opts)
 {
-       struct extent_pick_ptr pick;
        struct moving_io *io;
        const struct bch_extent_ptr *ptr;
        struct bch_extent_crc_unpacked crc;
        unsigned sectors = e.k->size, pages;
        int ret = -ENOMEM;
 
-       bch2_extent_pick_ptr(c, e.s_c, NULL, &pick);
-       if (IS_ERR_OR_NULL(pick.ca))
-               return pick.ca ? PTR_ERR(pick.ca) : 0;
+       move_ctxt_wait_event(ctxt,
+               atomic_read(&ctxt->write_sectors) <
+               SECTORS_IN_FLIGHT_PER_DEVICE);
+
+       move_ctxt_wait_event(ctxt,
+               atomic_read(&ctxt->read_sectors) <
+               SECTORS_IN_FLIGHT_PER_DEVICE);
 
        /* write path might have to decompress data: */
        extent_for_each_ptr_crc(e, ptr, crc)
@@ -347,8 +401,9 @@ static int bch2_move_extent(struct bch_fs *c,
        if (!io)
                goto err;
 
-       io->write.ctxt  = ctxt;
-       io->sectors     = e.k->size;
+       io->write.ctxt          = ctxt;
+       io->read_sectors        = e.k->size;
+       io->write_sectors       = e.k->size;
 
        bio_init(&io->write.op.wbio.bio, io->bi_inline_vecs, pages);
        bio_set_prio(&io->write.op.wbio.bio,
@@ -356,11 +411,12 @@ static int bch2_move_extent(struct bch_fs *c,
        io->write.op.wbio.bio.bi_iter.bi_size = sectors << 9;
 
        bch2_bio_map(&io->write.op.wbio.bio, NULL);
-       if (bio_alloc_pages(&io->write.op.wbio.bio, GFP_KERNEL))
+       if (bch2_bio_alloc_pages(&io->write.op.wbio.bio, GFP_KERNEL))
                goto err_free;
 
        io->rbio.opts = io_opts;
        bio_init(&io->rbio.bio, io->bi_inline_vecs, pages);
+       io->rbio.bio.bi_vcnt = pages;
        bio_set_prio(&io->rbio.bio, IOPRIO_PRIO_VALUE(IOPRIO_CLASS_IDLE, 0));
        io->rbio.bio.bi_iter.bi_size = sectors << 9;
 
@@ -368,8 +424,8 @@ static int bch2_move_extent(struct bch_fs *c,
        io->rbio.bio.bi_iter.bi_sector  = bkey_start_offset(e.k);
        io->rbio.bio.bi_end_io          = move_read_endio;
 
-       ret = bch2_migrate_write_init(c, &io->write, devs, wp,
-                                     io_opts, data_cmd, data_opts, e.s_c);
+       ret = bch2_migrate_write_init(c, &io->write, wp, io_opts,
+                                     data_cmd, data_opts, e.s_c);
        if (ret)
                goto err_free_pages;
 
@@ -378,7 +434,7 @@ static int bch2_move_extent(struct bch_fs *c,
 
        trace_move_extent(e.k);
 
-       atomic_add(io->sectors, &ctxt->sectors_in_flight);
+       atomic_add(io->read_sectors, &ctxt->read_sectors);
        list_add_tail(&io->list, &ctxt->reads);
 
        /*
@@ -386,51 +442,21 @@ static int bch2_move_extent(struct bch_fs *c,
         * ctxt when doing wakeup
         */
        closure_get(&ctxt->cl);
-       bch2_read_extent(c, &io->rbio, e, &pick, BCH_READ_NODECODE);
+       bch2_read_extent(c, &io->rbio, e.s_c,
+                        BCH_READ_NODECODE|
+                        BCH_READ_LAST_FRAGMENT);
        return 0;
 err_free_pages:
        bio_free_pages(&io->write.op.wbio.bio);
 err_free:
        kfree(io);
 err:
-       percpu_ref_put(&pick.ca->io_ref);
        trace_move_alloc_fail(e.k);
        return ret;
 }
 
-static void do_pending_writes(struct moving_context *ctxt)
-{
-       struct moving_io *io;
-
-       while ((io = next_pending_write(ctxt))) {
-               list_del(&io->list);
-               closure_call(&io->cl, move_write, NULL, &ctxt->cl);
-       }
-}
-
-#define move_ctxt_wait_event(_ctxt, _cond)                     \
-do {                                                           \
-       do_pending_writes(_ctxt);                               \
-                                                               \
-       if (_cond)                                              \
-               break;                                          \
-       __wait_event((_ctxt)->wait,                             \
-                    next_pending_write(_ctxt) || (_cond));     \
-} while (1)
-
-static void bch2_move_ctxt_wait_for_io(struct moving_context *ctxt)
-{
-       unsigned sectors_pending = atomic_read(&ctxt->sectors_in_flight);
-
-       move_ctxt_wait_event(ctxt,
-               !atomic_read(&ctxt->sectors_in_flight) ||
-               atomic_read(&ctxt->sectors_in_flight) != sectors_pending);
-}
-
 int bch2_move_data(struct bch_fs *c,
                   struct bch_ratelimit *rate,
-                  unsigned sectors_in_flight,
-                  struct bch_devs_mask *devs,
                   struct write_point_specifier wp,
                   struct bpos start,
                   struct bpos end,
@@ -460,13 +486,6 @@ int bch2_move_data(struct bch_fs *c,
                bch2_ratelimit_reset(rate);
 
        while (!kthread || !(ret = kthread_should_stop())) {
-               if (atomic_read(&ctxt.sectors_in_flight) >= sectors_in_flight) {
-                       bch2_btree_iter_unlock(&stats->iter);
-                       move_ctxt_wait_event(&ctxt,
-                                            atomic_read(&ctxt.sectors_in_flight) <
-                                            sectors_in_flight);
-               }
-
                if (rate &&
                    bch2_ratelimit_delay(rate) &&
                    (bch2_btree_iter_unlock(&stats->iter),
@@ -519,7 +538,7 @@ peek:
                k = bkey_i_to_s_c(&tmp.k);
                bch2_btree_iter_unlock(&stats->iter);
 
-               ret2 = bch2_move_extent(c, &ctxt, devs, wp, io_opts,
+               ret2 = bch2_move_extent(c, &ctxt, wp, io_opts,
                                        bkey_s_c_to_extent(k),
                                        data_cmd, data_opts);
                if (ret2) {
@@ -545,11 +564,10 @@ next_nondata:
 
        bch2_btree_iter_unlock(&stats->iter);
 
-       move_ctxt_wait_event(&ctxt, !atomic_read(&ctxt.sectors_in_flight));
+       move_ctxt_wait_event(&ctxt, list_empty(&ctxt.reads));
        closure_sync(&ctxt.cl);
 
-       EBUG_ON(!list_empty(&ctxt.reads));
-       EBUG_ON(atomic_read(&ctxt.sectors_in_flight));
+       EBUG_ON(atomic_read(&ctxt.write_sectors));
 
        trace_move_data(c,
                        atomic64_read(&stats->sectors_moved),
@@ -668,14 +686,15 @@ static enum data_cmd rereplicate_pred(struct bch_fs *c, void *arg,
                                      struct bch_io_opts *io_opts,
                                      struct data_opts *data_opts)
 {
-       unsigned nr_good = bch2_extent_nr_good_ptrs(c, e);
+       unsigned nr_good = bch2_extent_durability(c, e);
        unsigned replicas = type == BKEY_TYPE_BTREE
                ? c->opts.metadata_replicas
-               : c->opts.data_replicas;
+               : io_opts->data_replicas;
 
        if (!nr_good || nr_good >= replicas)
                return DATA_SKIP;
 
+       data_opts->target               = 0;
        data_opts->btree_insert_flags = 0;
        return DATA_ADD_REPLICAS;
 }
@@ -691,6 +710,7 @@ static enum data_cmd migrate_pred(struct bch_fs *c, void *arg,
        if (!bch2_extent_has_device(e, op->migrate.dev))
                return DATA_SKIP;
 
+       data_opts->target               = 0;
        data_opts->btree_insert_flags   = 0;
        data_opts->rewrite_dev          = op->migrate.dev;
        return DATA_REWRITE;
@@ -705,13 +725,12 @@ int bch2_data_job(struct bch_fs *c,
        switch (op.op) {
        case BCH_DATA_OP_REREPLICATE:
                stats->data_type = BCH_DATA_JOURNAL;
-               ret = bch2_journal_flush_device(&c->journal, -1);
+               ret = bch2_journal_flush_device_pins(&c->journal, -1);
 
                ret = bch2_move_btree(c, rereplicate_pred, c, stats) ?: ret;
                ret = bch2_gc_btree_replicas(c) ?: ret;
 
-               ret = bch2_move_data(c, NULL, SECTORS_IN_FLIGHT_PER_DEVICE,
-                                    NULL,
+               ret = bch2_move_data(c, NULL,
                                     writepoint_hashed((unsigned long) current),
                                     op.start,
                                     op.end,
@@ -723,13 +742,12 @@ int bch2_data_job(struct bch_fs *c,
                        return -EINVAL;
 
                stats->data_type = BCH_DATA_JOURNAL;
-               ret = bch2_journal_flush_device(&c->journal, op.migrate.dev);
+               ret = bch2_journal_flush_device_pins(&c->journal, op.migrate.dev);
 
                ret = bch2_move_btree(c, migrate_pred, &op, stats) ?: ret;
                ret = bch2_gc_btree_replicas(c) ?: ret;
 
-               ret = bch2_move_data(c, NULL, SECTORS_IN_FLIGHT_PER_DEVICE,
-                                    NULL,
+               ret = bch2_move_data(c, NULL,
                                     writepoint_hashed((unsigned long) current),
                                     op.start,
                                     op.end,