struct journal_entry_pin *,
journal_pin_flush_fn);
+static inline void journal_wake(struct journal *j)
+{
+ wake_up(&j->wait);
+ closure_wake_up(&j->async_wait);
+}
+
static inline struct journal_buf *journal_cur_buf(struct journal *j)
{
return j->buf + j->reservations.idx;
/* Sequence number of oldest dirty journal entry */
-static inline u64 last_seq(struct journal *j)
+static inline u64 journal_last_seq(struct journal *j)
{
- return atomic64_read(&j->seq) - fifo_used(&j->pin) + 1;
+ return j->pin.front;
+}
+
+static inline u64 journal_cur_seq(struct journal *j)
+{
+ BUG_ON(j->pin.back - 1 != atomic64_read(&j->seq));
+
+ return j->pin.back - 1;
}
static inline u64 journal_pin_seq(struct journal *j,
struct journal_entry_pin_list *pin_list)
{
- return last_seq(j) + fifo_entry_idx(&j->pin, pin_list);
+ return fifo_entry_idx_abs(&j->pin, pin_list);
+}
+
+u64 bch2_journal_pin_seq(struct journal *j, struct journal_entry_pin *pin)
+{
+ u64 ret = 0;
+
+ spin_lock(&j->lock);
+ if (journal_pin_active(pin))
+ ret = journal_pin_seq(j, pin->pin_list);
+ spin_unlock(&j->lock);
+
+ return ret;
}
static inline void bch2_journal_add_entry_noreservation(struct journal_buf *buf,
if (!entry)
return NULL;
+ if (!entry->u64s)
+ return ERR_PTR(-EINVAL);
+
k = entry->start;
*level = entry->level;
*level = entry->level;
if (!seq)
return 0;
- journal_seq = atomic64_read(&j->seq);
+ spin_lock(&j->lock);
+ journal_seq = journal_cur_seq(j);
+ spin_unlock(&j->lock);
/* Interier updates aren't journalled: */
BUG_ON(b->level);
* Given a journal entry we just read, add it to the list of journal entries to
* be replayed:
*/
-static int journal_entry_add(struct bch_fs *c, struct journal_list *jlist,
- struct jset *j)
+static int journal_entry_add(struct bch_fs *c, struct bch_dev *ca,
+ struct journal_list *jlist, struct jset *j)
{
struct journal_replay *i, *pos;
struct list_head *where;
__le64 last_seq;
int ret;
- mutex_lock(&jlist->lock);
-
last_seq = !list_empty(jlist->head)
? list_last_entry(jlist->head, struct journal_replay,
list)->j.last_seq
memcmp(j, &i->j, bytes), c,
"found duplicate but non identical journal entries (seq %llu)",
le64_to_cpu(j->seq));
-
- ret = JOURNAL_ENTRY_ADD_OK;
- goto out;
+ goto found;
}
if (le64_to_cpu(j->seq) > le64_to_cpu(i->j.seq)) {
goto out;
}
- memcpy(&i->j, j, bytes);
list_add(&i->list, where);
+ i->devs.nr = 0;
+ memcpy(&i->j, j, bytes);
+found:
+ if (!bch2_dev_list_has_dev(i->devs, ca->dev_idx))
+ bch2_dev_list_add_dev(&i->devs, ca->dev_idx);
+ else
+ fsck_err_on(1, c, "duplicate journal entries on same device");
ret = JOURNAL_ENTRY_ADD_OK;
out:
fsck_err:
- mutex_unlock(&jlist->lock);
return ret;
}
}};
}
+/* this fills in a range with empty jset_entries: */
static void journal_entry_null_range(void *start, void *end)
{
struct jset_entry *entry;
memset(entry, 0, sizeof(*entry));
}
-static int journal_validate_key(struct bch_fs *c, struct jset *j,
+static int journal_validate_key(struct bch_fs *c, struct jset *jset,
struct jset_entry *entry,
struct bkey_i *k, enum bkey_type key_type,
const char *type)
return 0;
}
- if (JSET_BIG_ENDIAN(j) != CPU_BIG_ENDIAN)
+ if (JSET_BIG_ENDIAN(jset) != CPU_BIG_ENDIAN)
bch2_bkey_swab(key_type, NULL, bkey_to_packed(k));
invalid = bch2_bkey_invalid(c, key_type, bkey_i_to_s_c(k));
if (invalid) {
bch2_bkey_val_to_text(c, key_type, buf, sizeof(buf),
bkey_i_to_s_c(k));
- mustfix_fsck_err(c, "invalid %s in journal: %s", type, buf);
+ mustfix_fsck_err(c, "invalid %s in journal: %s\n%s",
+ type, invalid, buf);
le16_add_cpu(&entry->u64s, -k->k.u64s);
memmove(k, bkey_next(k), next - (void *) bkey_next(k));
#define journal_entry_err_on(cond, c, msg, ...) \
((cond) ? journal_entry_err(c, msg, ##__VA_ARGS__) : false)
-static int __journal_entry_validate(struct bch_fs *c, struct jset *j,
- int write)
+static int journal_entry_validate_entries(struct bch_fs *c, struct jset *jset,
+ int write)
{
struct jset_entry *entry;
int ret = 0;
- vstruct_for_each(j, entry) {
+ vstruct_for_each(jset, entry) {
+ void *next = vstruct_next(entry);
struct bkey_i *k;
if (journal_entry_err_on(vstruct_next(entry) >
- vstruct_last(j), c,
+ vstruct_last(jset), c,
"journal entry extends past end of jset")) {
- j->u64s = cpu_to_le64((u64 *) entry - j->_data);
+ jset->u64s = cpu_to_le32((u64 *) entry - jset->_data);
break;
}
switch (entry->type) {
case JOURNAL_ENTRY_BTREE_KEYS:
vstruct_for_each(entry, k) {
- ret = journal_validate_key(c, j, entry, k,
+ ret = journal_validate_key(c, jset, entry, k,
bkey_type(entry->level,
entry->btree_id),
"key");
if (journal_entry_err_on(!entry->u64s ||
le16_to_cpu(entry->u64s) != k->k.u64s, c,
"invalid btree root journal entry: wrong number of keys")) {
- journal_entry_null_range(entry,
- vstruct_next(entry));
+ /*
+ * we don't want to null out this jset_entry,
+ * just the contents, so that later we can tell
+ * we were _supposed_ to have a btree root
+ */
+ entry->u64s = 0;
+ journal_entry_null_range(vstruct_next(entry), next);
continue;
}
- ret = journal_validate_key(c, j, entry, k,
+ ret = journal_validate_key(c, jset, entry, k,
BKEY_TYPE_BTREE, "btree root");
if (ret)
goto fsck_err;
}
static int journal_entry_validate(struct bch_fs *c,
- struct jset *j, u64 sector,
+ struct jset *jset, u64 sector,
unsigned bucket_sectors_left,
unsigned sectors_read,
int write)
{
- size_t bytes = vstruct_bytes(j);
+ size_t bytes = vstruct_bytes(jset);
struct bch_csum csum;
int ret = 0;
- if (le64_to_cpu(j->magic) != jset_magic(c))
+ if (le64_to_cpu(jset->magic) != jset_magic(c))
return JOURNAL_ENTRY_NONE;
- if (le32_to_cpu(j->version) != BCACHE_JSET_VERSION) {
+ if (le32_to_cpu(jset->version) != BCACHE_JSET_VERSION) {
bch_err(c, "unknown journal entry version %u",
- le32_to_cpu(j->version));
+ le32_to_cpu(jset->version));
return BCH_FSCK_UNKNOWN_VERSION;
}
if (bytes > sectors_read << 9)
return JOURNAL_ENTRY_REREAD;
- if (fsck_err_on(!bch2_checksum_type_valid(c, JSET_CSUM_TYPE(j)), c,
+ if (fsck_err_on(!bch2_checksum_type_valid(c, JSET_CSUM_TYPE(jset)), c,
"journal entry with unknown csum type %llu sector %lluu",
- JSET_CSUM_TYPE(j), sector))
+ JSET_CSUM_TYPE(jset), sector))
return JOURNAL_ENTRY_BAD;
- csum = csum_vstruct(c, JSET_CSUM_TYPE(j), journal_nonce(j), j);
- if (journal_entry_err_on(bch2_crc_cmp(csum, j->csum), c,
+ csum = csum_vstruct(c, JSET_CSUM_TYPE(jset), journal_nonce(jset), jset);
+ if (journal_entry_err_on(bch2_crc_cmp(csum, jset->csum), c,
"journal checksum bad, sector %llu", sector)) {
/* XXX: retry IO, when we start retrying checksum errors */
/* XXX: note we might have missing journal entries */
return JOURNAL_ENTRY_BAD;
}
- bch2_encrypt(c, JSET_CSUM_TYPE(j), journal_nonce(j),
- j->encrypted_start,
- vstruct_end(j) - (void *) j->encrypted_start);
+ bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
+ jset->encrypted_start,
+ vstruct_end(jset) - (void *) jset->encrypted_start);
- if (journal_entry_err_on(le64_to_cpu(j->last_seq) > le64_to_cpu(j->seq), c,
+ if (journal_entry_err_on(le64_to_cpu(jset->last_seq) > le64_to_cpu(jset->seq), c,
"invalid journal entry: last_seq > seq"))
- j->last_seq = j->seq;
+ jset->last_seq = jset->seq;
- return __journal_entry_validate(c, j, write);
+ return 0;
fsck_err:
return ret;
}
end - offset, buf->size >> 9);
bio_reset(bio);
- bio->bi_bdev = ca->disk_sb.bdev;
+ bio_set_dev(bio, ca->disk_sb.bdev);
bio->bi_iter.bi_sector = offset;
bio->bi_iter.bi_size = sectors_read << 9;
bio_set_op_attrs(bio, REQ_OP_READ, 0);
ja->bucket_seq[bucket] = le64_to_cpu(j->seq);
- ret = journal_entry_add(c, jlist, j);
+ mutex_lock(&jlist->lock);
+ ret = journal_entry_add(c, ca, jlist, j);
+ mutex_unlock(&jlist->lock);
+
switch (ret) {
case JOURNAL_ENTRY_ADD_OK:
*entries_found = true;
for_each_jset_entry_type(entry, &i->j,
JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED) {
- seq = le64_to_cpu(entry->_data[0]);
+ struct jset_entry_blacklist *bl_entry =
+ container_of(entry, struct jset_entry_blacklist, entry);
+ seq = le64_to_cpu(bl_entry->seq);
bch_verbose(c, "blacklisting existing journal seq %llu", seq);
struct journal_replay *i;
struct journal_entry_pin_list *p;
struct bch_dev *ca;
- u64 cur_seq, end_seq;
+ u64 cur_seq, end_seq, seq;
unsigned iter, keys = 0, entries = 0;
+ size_t nr;
+ bool degraded = false;
int ret = 0;
closure_init_stack(&jlist.cl);
jlist.head = list;
jlist.ret = 0;
- for_each_readable_member(ca, c, iter) {
- percpu_ref_get(&ca->io_ref);
- closure_call(&ca->journal.read,
- bch2_journal_read_device,
- system_unbound_wq,
- &jlist.cl);
+ for_each_member_device(ca, c, iter) {
+ if (!(bch2_dev_has_data(c, ca) & (1 << BCH_DATA_JOURNAL)))
+ continue;
+
+ if ((ca->mi.state == BCH_MEMBER_STATE_RW ||
+ ca->mi.state == BCH_MEMBER_STATE_RO) &&
+ percpu_ref_tryget(&ca->io_ref))
+ closure_call(&ca->journal.read,
+ bch2_journal_read_device,
+ system_unbound_wq,
+ &jlist.cl);
+ else
+ degraded = true;
}
closure_sync(&jlist.cl);
fsck_err_on(c->sb.clean && journal_has_keys(list), c,
"filesystem marked clean but journal has keys to replay");
+ list_for_each_entry(i, list, list) {
+ ret = journal_entry_validate_entries(c, &i->j, READ);
+ if (ret)
+ goto fsck_err;
+
+ /*
+ * If we're mounting in degraded mode - if we didn't read all
+ * the devices - this is wrong:
+ */
+
+ if (!degraded &&
+ (test_bit(BCH_FS_REBUILD_REPLICAS, &c->flags) ||
+ fsck_err_on(!bch2_replicas_marked(c, BCH_DATA_JOURNAL,
+ i->devs), c,
+ "superblock not marked as containing replicas (type %u)",
+ BCH_DATA_JOURNAL))) {
+ ret = bch2_mark_replicas(c, BCH_DATA_JOURNAL, i->devs);
+ if (ret)
+ return ret;
+ }
+ }
+
i = list_last_entry(list, struct journal_replay, list);
- unfixable_fsck_err_on(le64_to_cpu(i->j.seq) -
- le64_to_cpu(i->j.last_seq) + 1 > j->pin.size, c,
- "too many journal entries open for refcount fifo");
+ nr = le64_to_cpu(i->j.seq) - le64_to_cpu(i->j.last_seq) + 1;
+
+ if (nr > j->pin.size) {
+ free_fifo(&j->pin);
+ init_fifo(&j->pin, roundup_pow_of_two(nr), GFP_KERNEL);
+ if (!j->pin.data) {
+ bch_err(c, "error reallocating journal fifo (%zu open entries)", nr);
+ return -ENOMEM;
+ }
+ }
atomic64_set(&j->seq, le64_to_cpu(i->j.seq));
j->last_seq_ondisk = le64_to_cpu(i->j.last_seq);
j->pin.front = le64_to_cpu(i->j.last_seq);
j->pin.back = le64_to_cpu(i->j.seq) + 1;
- BUG_ON(last_seq(j) != le64_to_cpu(i->j.last_seq));
- BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
- &fifo_peek_back(&j->pin));
-
- fifo_for_each_entry_ptr(p, &j->pin, iter) {
+ fifo_for_each_entry_ptr(p, &j->pin, seq) {
INIT_LIST_HEAD(&p->list);
INIT_LIST_HEAD(&p->flushed);
atomic_set(&p->count, 0);
+ p->devs.nr = 0;
}
mutex_lock(&j->blacklist_lock);
p = journal_seq_pin(j, le64_to_cpu(i->j.seq));
atomic_set(&p->count, 1);
+ p->devs = i->devs;
if (journal_seq_blacklist_read(j, i, p)) {
mutex_unlock(&j->blacklist_lock);
mutex_unlock(&j->blacklist_lock);
- cur_seq = last_seq(j);
+ cur_seq = journal_last_seq(j);
end_seq = le64_to_cpu(list_last_entry(list,
struct journal_replay, list)->j.seq);
fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c,
"journal entries %llu-%llu missing! (replaying %llu-%llu)",
cur_seq, le64_to_cpu(i->j.seq) - 1,
- last_seq(j), end_seq);
+ journal_last_seq(j), end_seq);
cur_seq = le64_to_cpu(i->j.seq) + 1;
}
bch_info(c, "journal read done, %i keys in %i entries, seq %llu",
- keys, entries, (u64) atomic64_read(&j->seq));
+ keys, entries, journal_cur_seq(j));
fsck_err:
return ret;
}
{
struct journal_buf *w = journal_prev_buf(j);
- atomic_dec_bug(&journal_seq_pin(j, w->data->seq)->count);
+ atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
if (!need_write_just_set &&
test_bit(JOURNAL_NEED_WRITE, &j->flags))
#endif
}
-static void __journal_entry_new(struct journal *j, int count)
+static void journal_pin_new_entry(struct journal *j, int count)
{
- struct journal_entry_pin_list *p = fifo_push_ref(&j->pin);
+ struct journal_entry_pin_list *p;
/*
* The fifo_push() needs to happen at the same time as j->seq is
- * incremented for last_seq() to be calculated correctly
+ * incremented for journal_last_seq() to be calculated correctly
*/
atomic64_inc(&j->seq);
-
- BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
- &fifo_peek_back(&j->pin));
+ p = fifo_push_ref(&j->pin);
INIT_LIST_HEAD(&p->list);
INIT_LIST_HEAD(&p->flushed);
atomic_set(&p->count, count);
+ p->devs.nr = 0;
}
-static void __bch2_journal_next_entry(struct journal *j)
+static void bch2_journal_buf_init(struct journal *j)
{
- struct journal_buf *buf;
-
- __journal_entry_new(j, 1);
+ struct journal_buf *buf = journal_cur_buf(j);
- buf = journal_cur_buf(j);
memset(buf->has_inode, 0, sizeof(buf->has_inode));
memset(buf->data, 0, sizeof(*buf->data));
- buf->data->seq = cpu_to_le64(atomic64_read(&j->seq));
+ buf->data->seq = cpu_to_le64(journal_cur_seq(j));
buf->data->u64s = 0;
}
} while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v);
- journal_reclaim_fast(j);
-
clear_bit(JOURNAL_NEED_WRITE, &j->flags);
buf = &j->buf[old.idx];
buf->data->u64s = cpu_to_le32(old.cur_entry_offset);
- buf->data->last_seq = cpu_to_le64(last_seq(j));
j->prev_buf_sectors =
vstruct_blocks_plus(buf->data, c->block_bits,
journal_entry_u64s_reserve(buf)) *
c->opts.block_size;
-
BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
- __bch2_journal_next_entry(j);
+ journal_reclaim_fast(j);
+ /* XXX: why set this here, and not in journal_write()? */
+ buf->data->last_seq = cpu_to_le64(journal_last_seq(j));
+
+ journal_pin_new_entry(j, 1);
+
+ bch2_journal_buf_init(j);
cancel_delayed_work(&j->write_work);
spin_unlock(&j->lock);
} while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v);
- wake_up(&j->wait);
+ journal_wake(j);
closure_wake_up(&journal_cur_buf(j)->wait);
closure_wake_up(&journal_prev_buf(j)->wait);
}
* Don't use the last bucket unless writing the new last_seq
* will make another bucket available:
*/
- if (ja->bucket_seq[ja->last_idx] >= last_seq(j))
+ if (ja->bucket_seq[ja->last_idx] >= journal_last_seq(j))
available = max((int) available - 1, 0);
return available;
/*
* should _only_ called from journal_res_get() - when we actually want a
* journal reservation - journal entry is open means journal is dirty:
+ *
+ * returns:
+ * 1: success
+ * 0: journal currently full (must wait)
+ * -EROFS: insufficient rw devices
+ * -EIO: journal error
*/
static int journal_entry_open(struct journal *j)
{
struct journal_buf *buf = journal_cur_buf(j);
+ union journal_res_state old, new;
ssize_t u64s;
- int ret = 0, sectors;
+ int sectors;
+ u64 v;
lockdep_assert_held(&j->lock);
BUG_ON(journal_entry_is_open(j));
BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
- if (u64s > le32_to_cpu(buf->data->u64s)) {
- union journal_res_state old, new;
- u64 v = atomic64_read(&j->reservations.counter);
-
- /*
- * Must be set before marking the journal entry as open:
- */
- j->cur_entry_u64s = u64s;
-
- do {
- old.v = new.v = v;
+ if (u64s <= le32_to_cpu(buf->data->u64s))
+ return 0;
- if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
- return false;
+ /*
+ * Must be set before marking the journal entry as open:
+ */
+ j->cur_entry_u64s = u64s;
- /* Handle any already added entries */
- new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
- } while ((v = atomic64_cmpxchg(&j->reservations.counter,
- old.v, new.v)) != old.v);
- ret = 1;
+ v = atomic64_read(&j->reservations.counter);
+ do {
+ old.v = new.v = v;
- wake_up(&j->wait);
+ if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
+ return -EIO;
- if (j->res_get_blocked_start) {
- __bch2_time_stats_update(j->blocked_time,
- j->res_get_blocked_start);
- j->res_get_blocked_start = 0;
- }
+ /* Handle any already added entries */
+ new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
+ } while ((v = atomic64_cmpxchg(&j->reservations.counter,
+ old.v, new.v)) != old.v);
- mod_delayed_work(system_freezable_wq,
- &j->write_work,
- msecs_to_jiffies(j->write_delay_ms));
- }
+ if (j->res_get_blocked_start)
+ __bch2_time_stats_update(j->blocked_time,
+ j->res_get_blocked_start);
+ j->res_get_blocked_start = 0;
- return ret;
+ mod_delayed_work(system_freezable_wq,
+ &j->write_work,
+ msecs_to_jiffies(j->write_delay_ms));
+ journal_wake(j);
+ return 1;
}
void bch2_journal_start(struct bch_fs *c)
set_bit(JOURNAL_STARTED, &j->flags);
- while (atomic64_read(&j->seq) < new_seq)
- __journal_entry_new(j, 0);
+ while (journal_cur_seq(j) < new_seq)
+ journal_pin_new_entry(j, 0);
/*
* journal_buf_switch() only inits the next journal entry when it
* closes an open journal entry - the very first journal entry gets
* initialized here:
*/
- __bch2_journal_next_entry(j);
+ journal_pin_new_entry(j, 1);
+ bch2_journal_buf_init(j);
+
+ spin_unlock(&j->lock);
/*
* Adding entries to the next journal entry before allocating space on
bl->written = true;
}
- spin_unlock(&j->lock);
-
queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
}
struct bkey_i *k, *_n;
struct jset_entry *entry;
struct journal_replay *i, *n;
- int ret = 0, did_replay = 0;
+ int ret = 0;
list_for_each_entry_safe(i, n, list, list) {
j->replay_pin_list =
journal_seq_pin(j, le64_to_cpu(i->j.seq));
for_each_jset_key(k, _n, entry, &i->j) {
- struct disk_reservation disk_res;
if (entry->btree_id == BTREE_ID_ALLOC) {
/*
*/
ret = bch2_alloc_replay_key(c, k->k.p);
} else {
-
/*
* We might cause compressed extents to be
* split, so we need to pass in a
* disk_reservation:
*/
- BUG_ON(bch2_disk_reservation_get(c, &disk_res, 0, 0));
+ struct disk_reservation disk_res =
+ bch2_disk_reservation_init(c, 0);
ret = bch2_btree_insert(c, entry->btree_id, k,
&disk_res, NULL, NULL,
BTREE_INSERT_NOFAIL|
BTREE_INSERT_JOURNAL_REPLAY);
- bch2_disk_reservation_put(c, &disk_res);
}
if (ret) {
}
cond_resched();
- did_replay = true;
}
if (atomic_dec_and_test(&j->replay_pin_list->count))
- wake_up(&j->wait);
+ journal_wake(j);
}
j->replay_pin_list = NULL;
- if (did_replay) {
- bch2_journal_flush_pins(&c->journal, U64_MAX);
-
- /*
- * Write a new journal entry _before_ we start journalling new data -
- * otherwise, we could end up with btree node bsets with journal seqs
- * arbitrarily far in the future vs. the most recently written journal
- * entry on disk, if we crash before writing the next journal entry:
- */
- ret = bch2_journal_meta(j);
- if (ret) {
- bch_err(c, "journal replay: error %d flushing journal", ret);
- goto err;
- }
- }
-
bch2_journal_set_replay_done(j);
+ ret = bch2_journal_flush_all_pins(j);
err:
bch2_journal_entries_free(list);
return ret;
*/
if (bch2_disk_reservation_get(c, &disk_res,
- bucket_to_sector(ca, nr - ja->nr), 0))
+ bucket_to_sector(ca, nr - ja->nr), 1, 0))
return -ENOSPC;
mutex_lock(&c->sb_lock);
memcpy(new_bucket_seq, ja->bucket_seq, ja->nr * sizeof(u64));
swap(new_buckets, ja->buckets);
swap(new_bucket_seq, ja->bucket_seq);
+ spin_unlock(&j->lock);
while (ja->nr < nr) {
- /* must happen under journal lock, to avoid racing with gc: */
- long b = bch2_bucket_alloc(c, ca, RESERVE_ALLOC);
- if (b < 0) {
- if (!closure_wait(&c->freelist_wait, &cl)) {
- spin_unlock(&j->lock);
+ struct open_bucket *ob;
+ size_t bucket;
+ int ob_idx;
+
+ ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, &cl);
+ if (ob_idx < 0) {
+ if (!closure_wait(&c->freelist_wait, &cl))
closure_sync(&cl);
- spin_lock(&j->lock);
- }
continue;
}
- bch2_mark_metadata_bucket(ca, &ca->buckets[b],
- BUCKET_JOURNAL, false);
- bch2_mark_alloc_bucket(ca, &ca->buckets[b], false);
+ ob = c->open_buckets + ob_idx;
+ bucket = sector_to_bucket(ca, ob->ptr.offset);
- memmove(ja->buckets + ja->last_idx + 1,
- ja->buckets + ja->last_idx,
- (ja->nr - ja->last_idx) * sizeof(u64));
- memmove(ja->bucket_seq + ja->last_idx + 1,
- ja->bucket_seq + ja->last_idx,
- (ja->nr - ja->last_idx) * sizeof(u64));
- memmove(journal_buckets->buckets + ja->last_idx + 1,
- journal_buckets->buckets + ja->last_idx,
- (ja->nr - ja->last_idx) * sizeof(u64));
+ spin_lock(&j->lock);
+ __array_insert_item(ja->buckets, ja->nr, ja->last_idx);
+ __array_insert_item(ja->bucket_seq, ja->nr, ja->last_idx);
+ __array_insert_item(journal_buckets->buckets, ja->nr, ja->last_idx);
- ja->buckets[ja->last_idx] = b;
- journal_buckets->buckets[ja->last_idx] = cpu_to_le64(b);
+ ja->buckets[ja->last_idx] = bucket;
+ ja->bucket_seq[ja->last_idx] = 0;
+ journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket);
if (ja->last_idx < ja->nr) {
if (ja->cur_idx >= ja->last_idx)
ja->last_idx++;
}
ja->nr++;
+ spin_unlock(&j->lock);
- }
- spin_unlock(&j->lock);
+ bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
+ ca->mi.bucket_size,
+ gc_phase(GC_PHASE_SB), 0);
- BUG_ON(bch2_sb_validate_journal(ca->disk_sb.sb, ca->mi));
+ bch2_open_bucket_put(c, ob);
+ }
bch2_write_super(c);
if (!ret)
bch2_dev_allocator_add(c, ca);
+ closure_sync(&cl);
+
return ret;
}
-int bch2_dev_journal_alloc(struct bch_dev *ca)
+int bch2_dev_journal_alloc(struct bch_fs *c, struct bch_dev *ca)
{
unsigned nr;
min(1 << 10,
(1 << 20) / ca->mi.bucket_size));
- return bch2_set_nr_journal_buckets(ca->fs, ca, nr);
+ return bch2_set_nr_journal_buckets(c, ca, nr);
}
/* Journalling */
}
if (popped)
- wake_up(&j->wait);
+ journal_wake(j);
}
/*
journal_pin_flush_fn flush_fn)
{
BUG_ON(journal_pin_active(pin));
+ BUG_ON(!atomic_read(&pin_list->count));
atomic_inc(&pin_list->count);
pin->pin_list = pin_list;
list_add(&pin->list, &pin_list->list);
else
INIT_LIST_HEAD(&pin->list);
+
+ /*
+ * If the journal is currently full, we might want to call flush_fn
+ * immediately:
+ */
+ journal_wake(j);
}
static void journal_pin_add_entry(struct journal *j,
struct journal_entry_pin *pin,
journal_pin_flush_fn flush_fn)
{
- spin_lock_irq(&j->pin_lock);
+ spin_lock(&j->lock);
__journal_pin_add(j, pin_list, pin, flush_fn);
- spin_unlock_irq(&j->pin_lock);
+ spin_unlock(&j->lock);
}
void bch2_journal_pin_add(struct journal *j,
? journal_seq_pin(j, res->seq)
: j->replay_pin_list;
- spin_lock_irq(&j->pin_lock);
+ spin_lock(&j->lock);
__journal_pin_add(j, pin_list, pin, flush_fn);
- spin_unlock_irq(&j->pin_lock);
+ spin_unlock(&j->lock);
}
-static inline bool __journal_pin_drop(struct journal *j,
+static inline void __journal_pin_drop(struct journal *j,
struct journal_entry_pin *pin)
{
struct journal_entry_pin_list *pin_list = pin->pin_list;
- pin->pin_list = NULL;
+ if (!journal_pin_active(pin))
+ return;
- /* journal_reclaim_work() might have already taken us off the list */
- if (!list_empty_careful(&pin->list))
- list_del_init(&pin->list);
+ pin->pin_list = NULL;
+ list_del_init(&pin->list);
- return atomic_dec_and_test(&pin_list->count);
+ /*
+ * Unpinning a journal entry make make journal_next_bucket() succeed, if
+ * writing a new last_seq will now make another bucket available:
+ */
+ if (atomic_dec_and_test(&pin_list->count) &&
+ pin_list == &fifo_peek_front(&j->pin))
+ journal_reclaim_fast(j);
}
void bch2_journal_pin_drop(struct journal *j,
struct journal_entry_pin *pin)
{
- unsigned long flags;
- bool wakeup = false;
-
- spin_lock_irqsave(&j->pin_lock, flags);
- if (journal_pin_active(pin))
- wakeup = __journal_pin_drop(j, pin);
- spin_unlock_irqrestore(&j->pin_lock, flags);
-
- /*
- * Unpinning a journal entry make make journal_next_bucket() succeed, if
- * writing a new last_seq will now make another bucket available:
- *
- * Nested irqsave is expensive, don't do the wakeup with lock held:
- */
- if (wakeup)
- wake_up(&j->wait);
+ spin_lock(&j->lock);
+ __journal_pin_drop(j, pin);
+ spin_unlock(&j->lock);
}
void bch2_journal_pin_add_if_older(struct journal *j,
struct journal_entry_pin *pin,
journal_pin_flush_fn flush_fn)
{
- spin_lock_irq(&j->pin_lock);
+ spin_lock(&j->lock);
if (journal_pin_active(src_pin) &&
(!journal_pin_active(pin) ||
- fifo_entry_idx(&j->pin, src_pin->pin_list) <
- fifo_entry_idx(&j->pin, pin->pin_list))) {
- if (journal_pin_active(pin))
- __journal_pin_drop(j, pin);
+ journal_pin_seq(j, src_pin->pin_list) <
+ journal_pin_seq(j, pin->pin_list))) {
+ __journal_pin_drop(j, pin);
__journal_pin_add(j, src_pin->pin_list, pin, flush_fn);
}
- spin_unlock_irq(&j->pin_lock);
+ spin_unlock(&j->lock);
}
static struct journal_entry_pin *
-journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
+__journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
{
struct journal_entry_pin_list *pin_list;
- struct journal_entry_pin *ret = NULL;
- unsigned iter;
+ struct journal_entry_pin *ret;
+ u64 iter;
- /* so we don't iterate over empty fifo entries below: */
- if (!atomic_read(&fifo_peek_front(&j->pin).count)) {
- spin_lock(&j->lock);
- journal_reclaim_fast(j);
- spin_unlock(&j->lock);
- }
+ /* no need to iterate over empty fifo entries: */
+ journal_reclaim_fast(j);
- spin_lock_irq(&j->pin_lock);
fifo_for_each_entry_ptr(pin_list, &j->pin, iter) {
- if (journal_pin_seq(j, pin_list) > seq_to_flush)
+ if (iter > seq_to_flush)
break;
ret = list_first_entry_or_null(&pin_list->list,
if (ret) {
/* must be list_del_init(), see bch2_journal_pin_drop() */
list_move(&ret->list, &pin_list->flushed);
- *seq = journal_pin_seq(j, pin_list);
- break;
+ *seq = iter;
+ return ret;
}
}
- spin_unlock_irq(&j->pin_lock);
- return ret;
+ return NULL;
}
-static bool journal_flush_done(struct journal *j, u64 seq_to_flush)
+static struct journal_entry_pin *
+journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
{
- bool ret;
+ struct journal_entry_pin *ret;
spin_lock(&j->lock);
- journal_reclaim_fast(j);
+ ret = __journal_get_next_pin(j, seq_to_flush, seq);
+ spin_unlock(&j->lock);
+
+ return ret;
+}
+
+static int journal_flush_done(struct journal *j, u64 seq_to_flush,
+ struct journal_entry_pin **pin,
+ u64 *pin_seq)
+{
+ int ret;
+
+ *pin = NULL;
- ret = (fifo_used(&j->pin) == 1 &&
- atomic_read(&fifo_peek_front(&j->pin).count) == 1) ||
- last_seq(j) > seq_to_flush;
+ ret = bch2_journal_error(j);
+ if (ret)
+ return ret;
+
+ spin_lock(&j->lock);
+ /*
+ * If journal replay hasn't completed, the unreplayed journal entries
+ * hold refs on their corresponding sequence numbers
+ */
+ ret = (*pin = __journal_get_next_pin(j, seq_to_flush, pin_seq)) != NULL ||
+ !test_bit(JOURNAL_REPLAY_DONE, &j->flags) ||
+ journal_last_seq(j) > seq_to_flush ||
+ (fifo_used(&j->pin) == 1 &&
+ atomic_read(&fifo_peek_front(&j->pin).count) == 1);
spin_unlock(&j->lock);
return ret;
}
-void bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
+int bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_entry_pin *pin;
u64 pin_seq;
+ bool flush;
if (!test_bit(JOURNAL_STARTED, &j->flags))
- return;
-
- while ((pin = journal_get_next_pin(j, seq_to_flush, &pin_seq)))
+ return 0;
+again:
+ wait_event(j->wait, journal_flush_done(j, seq_to_flush, &pin, &pin_seq));
+ if (pin) {
+ /* flushing a journal pin might cause a new one to be added: */
pin->flush(j, pin, pin_seq);
+ goto again;
+ }
- wait_event(j->wait,
- journal_flush_done(j, seq_to_flush) ||
- bch2_journal_error(j));
+ spin_lock(&j->lock);
+ flush = journal_last_seq(j) != j->last_seq_ondisk ||
+ (seq_to_flush == U64_MAX && c->btree_roots_dirty);
+ spin_unlock(&j->lock);
+
+ return flush ? bch2_journal_meta(j) : 0;
+}
+
+int bch2_journal_flush_all_pins(struct journal *j)
+{
+ return bch2_journal_flush_pins(j, U64_MAX);
}
static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
ja->last_idx = (ja->last_idx + 1) % ja->nr;
spin_unlock(&j->lock);
- wake_up(&j->wait);
+ journal_wake(j);
}
/*
mutex_unlock(&j->reclaim_lock);
/* Also flush if the pin fifo is more than half full */
+ spin_lock(&j->lock);
seq_to_flush = max_t(s64, seq_to_flush,
- (s64) atomic64_read(&j->seq) -
+ (s64) journal_cur_seq(j) -
(j->pin.size >> 1));
+ spin_unlock(&j->lock);
/*
* If it's been longer than j->reclaim_delay_ms since we last flushed,
/**
* journal_next_bucket - move on to the next journal bucket if possible
*/
-static int journal_write_alloc(struct journal *j, unsigned sectors)
+static int journal_write_alloc(struct journal *j, struct journal_buf *w,
+ unsigned sectors)
{
struct bch_fs *c = container_of(j, struct bch_fs, journal);
- struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
+ struct bkey_s_extent e;
struct bch_extent_ptr *ptr;
struct journal_device *ja;
struct bch_dev *ca;
READ_ONCE(c->opts.metadata_replicas);
spin_lock(&j->lock);
+ e = bkey_i_to_s_extent(&j->key);
/*
* Drop any pointers to devices that have been removed, are no longer
* i.e. whichever device was limiting the current journal entry size.
*/
extent_for_each_ptr_backwards(e, ptr) {
- ca = c->devs[ptr->dev];
+ ca = bch_dev_bkey_exists(c, ptr->dev);
if (ca->mi.state != BCH_MEMBER_STATE_RW ||
ca->journal.sectors_free <= sectors)
ja->sectors_free = ca->mi.bucket_size - sectors;
ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
- ja->bucket_seq[ja->cur_idx] = atomic64_read(&j->seq);
+ ja->bucket_seq[ja->cur_idx] = le64_to_cpu(w->data->seq);
extent_ptr_append(bkey_i_to_extent(&j->key),
(struct bch_extent_ptr) {
rcu_read_unlock();
j->prev_buf_sectors = 0;
+
+ bkey_copy(&w->key, &j->key);
spin_unlock(&j->lock);
if (replicas < c->opts.metadata_replicas_required)
static void journal_write_done(struct closure *cl)
{
struct journal *j = container_of(cl, struct journal, io);
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_buf *w = journal_prev_buf(j);
+ struct bch_devs_list devs =
+ bch2_extent_devs(bkey_i_to_s_c_extent(&w->key));
+ if (!devs.nr) {
+ bch_err(c, "unable to write journal to sufficient devices");
+ goto err;
+ }
+
+ if (bch2_mark_replicas(c, BCH_DATA_JOURNAL, devs))
+ goto err;
+out:
__bch2_time_stats_update(j->write_time, j->write_start_time);
spin_lock(&j->lock);
j->last_seq_ondisk = le64_to_cpu(w->data->last_seq);
+ journal_seq_pin(j, le64_to_cpu(w->data->seq))->devs = devs;
+
/*
* Updating last_seq_ondisk may let journal_reclaim_work() discard more
* buckets:
&j->reservations.counter);
closure_wake_up(&w->wait);
- wake_up(&j->wait);
+ journal_wake(j);
if (test_bit(JOURNAL_NEED_WRITE, &j->flags))
mod_delayed_work(system_freezable_wq, &j->write_work, 0);
spin_unlock(&j->lock);
-}
-
-static void journal_write_error(struct closure *cl)
-{
- struct journal *j = container_of(cl, struct journal, io);
- struct bch_fs *c = container_of(j, struct bch_fs, journal);
- struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
-
- while (j->replicas_failed) {
- unsigned idx = __fls(j->replicas_failed);
-
- bch2_extent_drop_ptr_idx(e, idx);
- j->replicas_failed ^= 1 << idx;
- }
-
- if (!bch2_extent_nr_ptrs(e.c)) {
- bch_err(c, "unable to write journal to sufficient devices");
- goto err;
- }
-
- if (bch2_check_mark_super(c, e.c, BCH_DATA_JOURNAL))
- goto err;
-
-out:
- journal_write_done(cl);
return;
err:
bch2_fatal_error(c);
struct bch_dev *ca = bio->bi_private;
struct journal *j = &ca->fs->journal;
- if (bch2_dev_io_err_on(bio->bi_error, ca, "journal write") ||
+ if (bch2_dev_io_err_on(bio->bi_status, ca, "journal write") ||
bch2_meta_write_fault("journal")) {
- /* Was this a flush or an actual journal write? */
- if (ca->journal.ptr_idx != U8_MAX) {
- set_bit(ca->journal.ptr_idx, &j->replicas_failed);
- set_closure_fn(&j->io, journal_write_error,
- system_highpri_wq);
- }
+ struct journal_buf *w = journal_prev_buf(j);
+ unsigned long flags;
+
+ spin_lock_irqsave(&j->err_lock, flags);
+ bch2_extent_drop_device(bkey_i_to_s_extent(&w->key), ca->dev_idx);
+ spin_unlock_irqrestore(&j->err_lock, flags);
}
closure_put(&j->io);
struct jset *jset;
struct bio *bio;
struct bch_extent_ptr *ptr;
- unsigned i, sectors, bytes, ptr_idx = 0;
+ unsigned i, sectors, bytes;
journal_buf_realloc(j, w);
jset = w->data;
if (r->alive)
bch2_journal_add_btree_root(w, i, &r->key, r->level);
}
+ c->btree_roots_dirty = false;
mutex_unlock(&c->btree_root_lock);
journal_write_compact(jset);
SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
if (bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
- __journal_entry_validate(c, jset, WRITE))
+ journal_entry_validate_entries(c, jset, WRITE))
goto err;
bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
journal_nonce(jset), jset);
if (!bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
- __journal_entry_validate(c, jset, WRITE))
+ journal_entry_validate_entries(c, jset, WRITE))
goto err;
sectors = vstruct_sectors(jset, c->block_bits);
bytes = vstruct_bytes(w->data);
memset((void *) w->data + bytes, 0, (sectors << 9) - bytes);
- if (journal_write_alloc(j, sectors)) {
+ if (journal_write_alloc(j, w, sectors)) {
bch2_journal_halt(j);
bch_err(c, "Unable to allocate journal write");
bch2_fatal_error(c);
continue_at(cl, journal_write_done, system_highpri_wq);
}
- if (bch2_check_mark_super(c, bkey_i_to_s_c_extent(&j->key),
- BCH_DATA_JOURNAL))
- goto err;
-
/*
* XXX: we really should just disable the entire journal in nochanges
* mode
if (c->opts.nochanges)
goto no_io;
- extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr) {
- ca = c->devs[ptr->dev];
+ extent_for_each_ptr(bkey_i_to_s_extent(&w->key), ptr) {
+ ca = bch_dev_bkey_exists(c, ptr->dev);
if (!percpu_ref_tryget(&ca->io_ref)) {
/* XXX: fix this */
bch_err(c, "missing device for journal write\n");
this_cpu_add(ca->io_done->sectors[WRITE][BCH_DATA_JOURNAL],
sectors);
- ca->journal.ptr_idx = ptr_idx++;
bio = ca->journal.bio;
bio_reset(bio);
+ bio_set_dev(bio, ca->disk_sb.bdev);
bio->bi_iter.bi_sector = ptr->offset;
- bio->bi_bdev = ca->disk_sb.bdev;
bio->bi_iter.bi_size = sectors << 9;
bio->bi_end_io = journal_write_endio;
bio->bi_private = ca;
for_each_rw_member(ca, c, i)
if (journal_flushes_device(ca) &&
- !bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key), i)) {
+ !bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), i)) {
percpu_ref_get(&ca->io_ref);
- ca->journal.ptr_idx = U8_MAX;
bio = ca->journal.bio;
bio_reset(bio);
- bio->bi_bdev = ca->disk_sb.bdev;
+ bio_set_dev(bio, ca->disk_sb.bdev);
bio->bi_opf = REQ_OP_FLUSH;
bio->bi_end_io = journal_write_endio;
bio->bi_private = ca;
continue_at(cl, journal_write_done, system_highpri_wq);
}
-static void journal_write_work(struct work_struct *work)
+/*
+ * returns true if there's nothing to flush and no journal write still in flight
+ */
+static bool journal_flush_write(struct journal *j)
{
- struct journal *j = container_of(to_delayed_work(work),
- struct journal, write_work);
+ bool ret;
+
spin_lock(&j->lock);
+ ret = !j->reservations.prev_buf_unwritten;
+
if (!journal_entry_is_open(j)) {
spin_unlock(&j->lock);
- return;
+ return ret;
}
set_bit(JOURNAL_NEED_WRITE, &j->flags);
- if (journal_buf_switch(j, false) != JOURNAL_UNLOCKED)
+ if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
+ ret = false;
+ else
spin_unlock(&j->lock);
+ return ret;
+}
+
+static void journal_write_work(struct work_struct *work)
+{
+ struct journal *j = container_of(work, struct journal, write_work.work);
+
+ journal_flush_write(j);
}
/*
spin_lock(&j->lock);
if (test_bit(h, journal_cur_buf(j)->has_inode))
- seq = atomic64_read(&j->seq);
+ seq = journal_cur_seq(j);
else if (test_bit(h, journal_prev_buf(j)->has_inode))
- seq = atomic64_read(&j->seq) - 1;
+ seq = journal_cur_seq(j) - 1;
spin_unlock(&j->lock);
return seq;
return ret < 0 ? ret : 0;
}
+u64 bch2_journal_last_unwritten_seq(struct journal *j)
+{
+ u64 seq;
+
+ spin_lock(&j->lock);
+ seq = journal_cur_seq(j);
+ if (j->reservations.prev_buf_unwritten)
+ seq--;
+ spin_unlock(&j->lock);
+
+ return seq;
+}
+
+int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
+{
+ int ret;
+
+ spin_lock(&j->lock);
+ BUG_ON(seq > journal_cur_seq(j));
+
+ if (seq < journal_cur_seq(j) ||
+ journal_entry_is_open(j)) {
+ spin_unlock(&j->lock);
+ return 1;
+ }
+
+ ret = journal_entry_open(j);
+ if (!ret)
+ closure_wait(&j->async_wait, parent);
+ spin_unlock(&j->lock);
+
+ if (!ret)
+ journal_reclaim_work(&j->reclaim_work.work);
+
+ return ret;
+}
+
void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
{
spin_lock(&j->lock);
- BUG_ON(seq > atomic64_read(&j->seq));
+ BUG_ON(seq > journal_cur_seq(j));
if (bch2_journal_error(j)) {
spin_unlock(&j->lock);
return;
}
- if (seq == atomic64_read(&j->seq)) {
+ if (seq == journal_cur_seq(j)) {
if (!closure_wait(&journal_cur_buf(j)->wait, parent))
BUG();
- } else if (seq + 1 == atomic64_read(&j->seq) &&
+ } else if (seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten) {
if (!closure_wait(&journal_prev_buf(j)->wait, parent))
BUG();
spin_lock(&j->lock);
- BUG_ON(seq > atomic64_read(&j->seq));
+ BUG_ON(seq > journal_cur_seq(j));
if (bch2_journal_error(j)) {
spin_unlock(&j->lock);
return;
}
- if (seq == atomic64_read(&j->seq)) {
+ if (seq == journal_cur_seq(j)) {
bool set_need_write = false;
buf = journal_cur_buf(j);
case JOURNAL_ENTRY_CLOSED:
/*
* Journal entry hasn't been opened yet, but caller
- * claims it has something (seq == j->seq):
+ * claims it has something
*/
BUG();
case JOURNAL_ENTRY_INUSE:
return;
}
} else if (parent &&
- seq + 1 == atomic64_read(&j->seq) &&
+ seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten) {
buf = journal_prev_buf(j);
int ret = 1;
spin_lock(&j->lock);
- BUG_ON(seq > atomic64_read(&j->seq));
+ BUG_ON(seq > journal_cur_seq(j));
- if (seq == atomic64_read(&j->seq)) {
+ if (seq == journal_cur_seq(j)) {
bool set_need_write = false;
ret = 0;
case JOURNAL_ENTRY_CLOSED:
/*
* Journal entry hasn't been opened yet, but caller
- * claims it has something (seq == j->seq):
+ * claims it has something
*/
BUG();
case JOURNAL_ENTRY_INUSE:
case JOURNAL_UNLOCKED:
return 0;
}
- } else if (seq + 1 == atomic64_read(&j->seq) &&
+ } else if (seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten) {
ret = bch2_journal_error(j);
}
u64 seq, journal_seq;
spin_lock(&j->lock);
- journal_seq = atomic64_read(&j->seq);
+ journal_seq = journal_cur_seq(j);
if (journal_entry_is_open(j)) {
seq = journal_seq;
u64 seq, journal_seq;
spin_lock(&j->lock);
- journal_seq = atomic64_read(&j->seq);
+ journal_seq = journal_cur_seq(j);
if (journal_entry_is_open(j)) {
seq = journal_seq;
return bch2_journal_flush_seq(j, seq);
}
-ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
+int bch2_journal_flush_device(struct journal *j, int dev_idx)
{
struct bch_fs *c = container_of(j, struct bch_fs, journal);
- union journal_res_state *s = &j->reservations;
- struct bch_dev *ca;
- unsigned iter;
- ssize_t ret = 0;
+ struct journal_entry_pin_list *p;
+ struct bch_devs_list devs;
+ u64 iter, seq = 0;
+ int ret = 0;
- rcu_read_lock();
spin_lock(&j->lock);
-
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "active journal entries:\t%zu\n"
- "seq:\t\t\t%llu\n"
- "last_seq:\t\t%llu\n"
- "last_seq_ondisk:\t%llu\n"
- "reservation count:\t%u\n"
- "reservation offset:\t%u\n"
- "current entry u64s:\t%u\n"
- "io in flight:\t\t%i\n"
- "need write:\t\t%i\n"
- "dirty:\t\t\t%i\n"
- "replay done:\t\t%i\n",
- fifo_used(&j->pin),
- (u64) atomic64_read(&j->seq),
- last_seq(j),
- j->last_seq_ondisk,
- journal_state_count(*s, s->idx),
- s->cur_entry_offset,
- j->cur_entry_u64s,
- s->prev_buf_unwritten,
- test_bit(JOURNAL_NEED_WRITE, &j->flags),
- journal_entry_is_open(j),
- test_bit(JOURNAL_REPLAY_DONE, &j->flags));
-
- for_each_member_device_rcu(ca, c, iter,
- &c->rw_devs[BCH_DATA_JOURNAL]) {
- struct journal_device *ja = &ca->journal;
-
- if (!ja->nr)
- continue;
-
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "dev %u:\n"
- "\tnr\t\t%u\n"
- "\tcur_idx\t\t%u (seq %llu)\n"
- "\tlast_idx\t%u (seq %llu)\n",
- iter, ja->nr,
- ja->cur_idx, ja->bucket_seq[ja->cur_idx],
- ja->last_idx, ja->bucket_seq[ja->last_idx]);
- }
-
+ fifo_for_each_entry_ptr(p, &j->pin, iter)
+ if (dev_idx >= 0
+ ? bch2_dev_list_has_dev(p->devs, dev_idx)
+ : p->devs.nr < c->opts.metadata_replicas)
+ seq = iter;
spin_unlock(&j->lock);
- rcu_read_unlock();
- return ret;
-}
-
-ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
-{
- struct journal_entry_pin_list *pin_list;
- struct journal_entry_pin *pin;
- ssize_t ret = 0;
- unsigned i;
+ ret = bch2_journal_flush_pins(j, seq);
+ if (ret)
+ return ret;
- spin_lock_irq(&j->pin_lock);
- fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "%llu: count %u\n",
- journal_pin_seq(j, pin_list),
- atomic_read(&pin_list->count));
+ mutex_lock(&c->replicas_gc_lock);
+ bch2_replicas_gc_start(c, 1 << BCH_DATA_JOURNAL);
- list_for_each_entry(pin, &pin_list->list, list)
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "\t%p %pf\n",
- pin, pin->flush);
+ seq = 0;
- if (!list_empty(&pin_list->flushed))
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "flushed:\n");
+ spin_lock(&j->lock);
+ while (!ret && seq < j->pin.back) {
+ seq = max(seq, journal_last_seq(j));
+ devs = journal_seq_pin(j, seq)->devs;
+ seq++;
- list_for_each_entry(pin, &pin_list->flushed, list)
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "\t%p %pf\n",
- pin, pin->flush);
+ spin_unlock(&j->lock);
+ ret = bch2_mark_replicas(c, BCH_DATA_JOURNAL, devs);
+ spin_lock(&j->lock);
}
- spin_unlock_irq(&j->pin_lock);
+ spin_unlock(&j->lock);
+
+ bch2_replicas_gc_end(c, ret);
+ mutex_unlock(&c->replicas_gc_lock);
return ret;
}
-static bool bch2_journal_writing_to_device(struct bch_dev *ca)
+/* startup/shutdown: */
+
+static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
{
- struct journal *j = &ca->fs->journal;
+ union journal_res_state state;
+ struct journal_buf *w;
bool ret;
spin_lock(&j->lock);
- ret = bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key),
- ca->dev_idx);
+ state = READ_ONCE(j->reservations);
+ w = j->buf + !state.idx;
+
+ ret = state.prev_buf_unwritten &&
+ bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx);
spin_unlock(&j->lock);
return ret;
}
-/*
- * This asumes that ca has already been marked read-only so that
- * journal_next_bucket won't pick buckets out of ca any more.
- * Hence, if the journal is not currently pointing to ca, there
- * will be no new writes to journal entries in ca after all the
- * pending ones have been flushed to disk.
- *
- * If the journal is being written to ca, write a new record, and
- * journal_next_bucket will notice that the device is no longer
- * writeable and pick a new set of devices to write to.
- */
-
-int bch2_journal_move(struct bch_dev *ca)
+void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
{
- struct journal_device *ja = &ca->journal;
- struct journal *j = &ca->fs->journal;
- u64 seq_to_flush = 0;
- unsigned i;
- int ret;
-
- if (bch2_journal_writing_to_device(ca)) {
- /*
- * bch_journal_meta will write a record and we'll wait
- * for the write to complete.
- * Actually writing the journal (journal_write_locked)
- * will call journal_next_bucket which notices that the
- * device is no longer writeable, and picks a new one.
- */
- bch2_journal_meta(j);
- BUG_ON(bch2_journal_writing_to_device(ca));
- }
-
- for (i = 0; i < ja->nr; i++)
- seq_to_flush = max(seq_to_flush, ja->bucket_seq[i]);
-
- bch2_journal_flush_pins(j, seq_to_flush);
-
- /*
- * Force a meta-data journal entry to be written so that
- * we have newer journal entries in devices other than ca,
- * and wait for the meta data write to complete.
- */
- bch2_journal_meta(j);
-
- /*
- * Verify that we no longer need any of the journal entries in
- * the device
- */
spin_lock(&j->lock);
- ret = j->last_seq_ondisk > seq_to_flush ? 0 : -EIO;
+ bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
spin_unlock(&j->lock);
- return ret;
+ wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
}
void bch2_fs_journal_stop(struct journal *j)
{
- if (!test_bit(JOURNAL_STARTED, &j->flags))
- return;
-
- /*
- * Empty out the journal by first flushing everything pinning existing
- * journal entries, then force a brand new empty journal entry to be
- * written:
- */
- bch2_journal_flush_pins(j, U64_MAX);
- bch2_journal_flush_async(j, NULL);
- bch2_journal_meta(j);
+ wait_event(j->wait, journal_flush_write(j));
cancel_delayed_work_sync(&j->write_work);
cancel_delayed_work_sync(&j->reclaim_work);
int bch2_fs_journal_init(struct journal *j)
{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
static struct lock_class_key res_key;
+ int ret = 0;
+
+ pr_verbose_init(c->opts, "");
spin_lock_init(&j->lock);
- spin_lock_init(&j->pin_lock);
+ spin_lock_init(&j->err_lock);
init_waitqueue_head(&j->wait);
INIT_DELAYED_WORK(&j->write_work, journal_write_work);
INIT_DELAYED_WORK(&j->reclaim_work, journal_reclaim_work);
if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
!(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
- !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL)))
- return -ENOMEM;
+ !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) {
+ ret = -ENOMEM;
+ goto out;
+ }
j->pin.front = j->pin.back = 1;
+out:
+ pr_verbose_init(c->opts, "ret %i", ret);
+ return ret;
+}
- return 0;
+/* debug: */
+
+ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
+{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
+ union journal_res_state *s = &j->reservations;
+ struct bch_dev *ca;
+ unsigned iter;
+ ssize_t ret = 0;
+
+ rcu_read_lock();
+ spin_lock(&j->lock);
+
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "active journal entries:\t%llu\n"
+ "seq:\t\t\t%llu\n"
+ "last_seq:\t\t%llu\n"
+ "last_seq_ondisk:\t%llu\n"
+ "reservation count:\t%u\n"
+ "reservation offset:\t%u\n"
+ "current entry u64s:\t%u\n"
+ "io in flight:\t\t%i\n"
+ "need write:\t\t%i\n"
+ "dirty:\t\t\t%i\n"
+ "replay done:\t\t%i\n",
+ fifo_used(&j->pin),
+ journal_cur_seq(j),
+ journal_last_seq(j),
+ j->last_seq_ondisk,
+ journal_state_count(*s, s->idx),
+ s->cur_entry_offset,
+ j->cur_entry_u64s,
+ s->prev_buf_unwritten,
+ test_bit(JOURNAL_NEED_WRITE, &j->flags),
+ journal_entry_is_open(j),
+ test_bit(JOURNAL_REPLAY_DONE, &j->flags));
+
+ for_each_member_device_rcu(ca, c, iter,
+ &c->rw_devs[BCH_DATA_JOURNAL]) {
+ struct journal_device *ja = &ca->journal;
+
+ if (!ja->nr)
+ continue;
+
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "dev %u:\n"
+ "\tnr\t\t%u\n"
+ "\tcur_idx\t\t%u (seq %llu)\n"
+ "\tlast_idx\t%u (seq %llu)\n",
+ iter, ja->nr,
+ ja->cur_idx, ja->bucket_seq[ja->cur_idx],
+ ja->last_idx, ja->bucket_seq[ja->last_idx]);
+ }
+
+ spin_unlock(&j->lock);
+ rcu_read_unlock();
+
+ return ret;
+}
+
+ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
+{
+ struct journal_entry_pin_list *pin_list;
+ struct journal_entry_pin *pin;
+ ssize_t ret = 0;
+ u64 i;
+
+ spin_lock(&j->lock);
+ fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "%llu: count %u\n",
+ i, atomic_read(&pin_list->count));
+
+ list_for_each_entry(pin, &pin_list->list, list)
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "\t%p %pf\n",
+ pin, pin->flush);
+
+ if (!list_empty(&pin_list->flushed))
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "flushed:\n");
+
+ list_for_each_entry(pin, &pin_list->flushed, list)
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "\t%p %pf\n",
+ pin, pin->flush);
+ }
+ spin_unlock(&j->lock);
+
+ return ret;
}