2 * bcachefs journalling code, for btree insertions
4 * Copyright 2012 Google, Inc.
9 #include "bkey_methods.h"
12 #include "btree_update.h"
13 #include "btree_update_interior.h"
26 #include <trace/events/bcachefs.h>
28 static void journal_write(struct closure *);
29 static void journal_reclaim_fast(struct journal *);
30 static void journal_pin_add_entry(struct journal *,
31 struct journal_entry_pin_list *,
32 struct journal_entry_pin *,
33 journal_pin_flush_fn);
35 static inline void journal_wake(struct journal *j)
38 closure_wake_up(&j->async_wait);
41 static inline struct journal_buf *journal_cur_buf(struct journal *j)
43 return j->buf + j->reservations.idx;
46 static inline struct journal_buf *journal_prev_buf(struct journal *j)
48 return j->buf + !j->reservations.idx;
51 /* Sequence number of oldest dirty journal entry */
53 static inline u64 journal_last_seq(struct journal *j)
58 static inline u64 journal_cur_seq(struct journal *j)
60 BUG_ON(j->pin.back - 1 != atomic64_read(&j->seq));
62 return j->pin.back - 1;
65 static inline u64 journal_pin_seq(struct journal *j,
66 struct journal_entry_pin_list *pin_list)
68 return fifo_entry_idx_abs(&j->pin, pin_list);
71 u64 bch2_journal_pin_seq(struct journal *j, struct journal_entry_pin *pin)
76 if (journal_pin_active(pin))
77 ret = journal_pin_seq(j, pin->pin_list);
78 spin_unlock(&j->lock);
83 static inline void bch2_journal_add_entry_noreservation(struct journal_buf *buf,
84 unsigned type, enum btree_id id,
86 const void *data, size_t u64s)
88 struct jset *jset = buf->data;
90 bch2_journal_add_entry_at(buf, le32_to_cpu(jset->u64s),
91 type, id, level, data, u64s);
92 le32_add_cpu(&jset->u64s, jset_u64s(u64s));
95 static struct jset_entry *bch2_journal_find_entry(struct jset *j, unsigned type,
98 struct jset_entry *entry;
100 for_each_jset_entry_type(entry, j, type)
101 if (entry->btree_id == id)
107 struct bkey_i *bch2_journal_find_btree_root(struct bch_fs *c, struct jset *j,
108 enum btree_id id, unsigned *level)
111 struct jset_entry *entry =
112 bch2_journal_find_entry(j, JOURNAL_ENTRY_BTREE_ROOT, id);
118 return ERR_PTR(-EINVAL);
121 *level = entry->level;
122 *level = entry->level;
126 static void bch2_journal_add_btree_root(struct journal_buf *buf,
127 enum btree_id id, struct bkey_i *k,
130 bch2_journal_add_entry_noreservation(buf,
131 JOURNAL_ENTRY_BTREE_ROOT, id, level,
135 static void journal_seq_blacklist_flush(struct journal *j,
136 struct journal_entry_pin *pin, u64 seq)
139 container_of(j, struct bch_fs, journal);
140 struct journal_seq_blacklist *bl =
141 container_of(pin, struct journal_seq_blacklist, pin);
142 struct blacklisted_node n;
147 closure_init_stack(&cl);
150 struct btree_iter iter;
153 mutex_lock(&j->blacklist_lock);
154 if (i >= bl->nr_entries) {
155 mutex_unlock(&j->blacklist_lock);
159 mutex_unlock(&j->blacklist_lock);
161 __bch2_btree_iter_init(&iter, c, n.btree_id, n.pos, 0, 0, 0);
163 b = bch2_btree_iter_peek_node(&iter);
165 /* The node might have already been rewritten: */
167 if (b->data->keys.seq == n.seq) {
168 ret = bch2_btree_node_rewrite(c, &iter, n.seq, 0);
170 bch2_btree_iter_unlock(&iter);
171 bch2_fs_fatal_error(c,
172 "error %i rewriting btree node with blacklisted journal seq",
174 bch2_journal_halt(j);
179 bch2_btree_iter_unlock(&iter);
183 struct btree_update *as;
184 struct pending_btree_node_free *d;
186 mutex_lock(&j->blacklist_lock);
187 if (i >= bl->nr_entries) {
188 mutex_unlock(&j->blacklist_lock);
192 mutex_unlock(&j->blacklist_lock);
194 mutex_lock(&c->btree_interior_update_lock);
197 * Is the node on the list of pending interior node updates -
198 * being freed? If so, wait for that to finish:
200 for_each_pending_btree_node_free(c, as, d)
201 if (n.seq == d->seq &&
202 n.btree_id == d->btree_id &&
204 !bkey_cmp(n.pos, d->key.k.p)) {
205 closure_wait(&as->wait, &cl);
206 mutex_unlock(&c->btree_interior_update_lock);
211 mutex_unlock(&c->btree_interior_update_lock);
214 mutex_lock(&j->blacklist_lock);
216 bch2_journal_pin_drop(j, &bl->pin);
221 mutex_unlock(&j->blacklist_lock);
224 static struct journal_seq_blacklist *
225 journal_seq_blacklist_find(struct journal *j, u64 seq)
227 struct journal_seq_blacklist *bl;
229 lockdep_assert_held(&j->blacklist_lock);
231 list_for_each_entry(bl, &j->seq_blacklist, list)
238 static struct journal_seq_blacklist *
239 bch2_journal_seq_blacklisted_new(struct journal *j, u64 seq)
241 struct journal_seq_blacklist *bl;
243 lockdep_assert_held(&j->blacklist_lock);
246 * When we start the journal, bch2_journal_start() will skip over @seq:
249 bl = kzalloc(sizeof(*bl), GFP_KERNEL);
254 list_add_tail(&bl->list, &j->seq_blacklist);
259 * Returns true if @seq is newer than the most recent journal entry that got
260 * written, and data corresponding to @seq should be ignored - also marks @seq
261 * as blacklisted so that on future restarts the corresponding data will still
264 int bch2_journal_seq_should_ignore(struct bch_fs *c, u64 seq, struct btree *b)
266 struct journal *j = &c->journal;
267 struct journal_seq_blacklist *bl = NULL;
268 struct blacklisted_node *n;
276 journal_seq = journal_cur_seq(j);
277 spin_unlock(&j->lock);
279 /* Interier updates aren't journalled: */
281 BUG_ON(seq > journal_seq && test_bit(BCH_FS_INITIAL_GC_DONE, &c->flags));
284 * Decrease this back to j->seq + 2 when we next rev the on disk format:
285 * increasing it temporarily to work around bug in old kernels
287 bch2_fs_inconsistent_on(seq > journal_seq + 4, c,
288 "bset journal seq too far in the future: %llu > %llu",
291 if (seq <= journal_seq &&
292 list_empty_careful(&j->seq_blacklist))
295 mutex_lock(&j->blacklist_lock);
297 if (seq <= journal_seq) {
298 bl = journal_seq_blacklist_find(j, seq);
302 bch_verbose(c, "btree node %u:%llu:%llu has future journal sequence number %llu, blacklisting",
303 b->btree_id, b->key.k.p.inode, b->key.k.p.offset, seq);
305 for (i = journal_seq + 1; i <= seq; i++) {
306 bl = journal_seq_blacklist_find(j, i) ?:
307 bch2_journal_seq_blacklisted_new(j, i);
315 for (n = bl->entries; n < bl->entries + bl->nr_entries; n++)
316 if (b->data->keys.seq == n->seq &&
317 b->btree_id == n->btree_id &&
318 !bkey_cmp(b->key.k.p, n->pos))
321 if (!bl->nr_entries ||
322 is_power_of_2(bl->nr_entries)) {
323 n = krealloc(bl->entries,
324 max(bl->nr_entries * 2, 8UL) * sizeof(*n),
333 bl->entries[bl->nr_entries++] = (struct blacklisted_node) {
334 .seq = b->data->keys.seq,
335 .btree_id = b->btree_id,
341 mutex_unlock(&j->blacklist_lock);
346 * Journal replay/recovery:
348 * This code is all driven from bch2_fs_start(); we first read the journal
349 * entries, do some other stuff, then we mark all the keys in the journal
350 * entries (same as garbage collection would), then we replay them - reinserting
351 * them into the cache in precisely the same order as they appear in the
354 * We only journal keys that go in leaf nodes, which simplifies things quite a
358 struct journal_list {
361 struct list_head *head;
365 #define JOURNAL_ENTRY_ADD_OK 0
366 #define JOURNAL_ENTRY_ADD_OUT_OF_RANGE 5
369 * Given a journal entry we just read, add it to the list of journal entries to
372 static int journal_entry_add(struct bch_fs *c, struct bch_dev *ca,
373 struct journal_list *jlist, struct jset *j)
375 struct journal_replay *i, *pos;
376 struct list_head *where;
377 size_t bytes = vstruct_bytes(j);
381 last_seq = !list_empty(jlist->head)
382 ? list_last_entry(jlist->head, struct journal_replay,
386 /* Is this entry older than the range we need? */
387 if (le64_to_cpu(j->seq) < le64_to_cpu(last_seq)) {
388 ret = JOURNAL_ENTRY_ADD_OUT_OF_RANGE;
392 /* Drop entries we don't need anymore */
393 list_for_each_entry_safe(i, pos, jlist->head, list) {
394 if (le64_to_cpu(i->j.seq) >= le64_to_cpu(j->last_seq))
397 kvpfree(i, offsetof(struct journal_replay, j) +
398 vstruct_bytes(&i->j));
401 list_for_each_entry_reverse(i, jlist->head, list) {
403 if (le64_to_cpu(j->seq) == le64_to_cpu(i->j.seq)) {
404 fsck_err_on(bytes != vstruct_bytes(&i->j) ||
405 memcmp(j, &i->j, bytes), c,
406 "found duplicate but non identical journal entries (seq %llu)",
407 le64_to_cpu(j->seq));
411 if (le64_to_cpu(j->seq) > le64_to_cpu(i->j.seq)) {
419 i = kvpmalloc(offsetof(struct journal_replay, j) + bytes, GFP_KERNEL);
425 list_add(&i->list, where);
427 memcpy(&i->j, j, bytes);
429 if (!bch2_dev_list_has_dev(i->devs, ca->dev_idx))
430 bch2_dev_list_add_dev(&i->devs, ca->dev_idx);
432 fsck_err_on(1, c, "duplicate journal entries on same device");
433 ret = JOURNAL_ENTRY_ADD_OK;
439 static struct nonce journal_nonce(const struct jset *jset)
441 return (struct nonce) {{
443 [1] = ((__le32 *) &jset->seq)[0],
444 [2] = ((__le32 *) &jset->seq)[1],
445 [3] = BCH_NONCE_JOURNAL,
449 /* this fills in a range with empty jset_entries: */
450 static void journal_entry_null_range(void *start, void *end)
452 struct jset_entry *entry;
454 for (entry = start; entry != end; entry = vstruct_next(entry))
455 memset(entry, 0, sizeof(*entry));
458 static int journal_validate_key(struct bch_fs *c, struct jset *jset,
459 struct jset_entry *entry,
460 struct bkey_i *k, enum bkey_type key_type,
463 void *next = vstruct_next(entry);
468 if (mustfix_fsck_err_on(!k->k.u64s, c,
469 "invalid %s in journal: k->u64s 0", type)) {
470 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
471 journal_entry_null_range(vstruct_next(entry), next);
475 if (mustfix_fsck_err_on((void *) bkey_next(k) >
476 (void *) vstruct_next(entry), c,
477 "invalid %s in journal: extends past end of journal entry",
479 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
480 journal_entry_null_range(vstruct_next(entry), next);
484 if (mustfix_fsck_err_on(k->k.format != KEY_FORMAT_CURRENT, c,
485 "invalid %s in journal: bad format %u",
486 type, k->k.format)) {
487 le16_add_cpu(&entry->u64s, -k->k.u64s);
488 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
489 journal_entry_null_range(vstruct_next(entry), next);
493 if (JSET_BIG_ENDIAN(jset) != CPU_BIG_ENDIAN)
494 bch2_bkey_swab(key_type, NULL, bkey_to_packed(k));
496 invalid = bch2_bkey_invalid(c, key_type, bkey_i_to_s_c(k));
498 bch2_bkey_val_to_text(c, key_type, buf, sizeof(buf),
500 mustfix_fsck_err(c, "invalid %s in journal: %s\n%s",
503 le16_add_cpu(&entry->u64s, -k->k.u64s);
504 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
505 journal_entry_null_range(vstruct_next(entry), next);
512 #define JOURNAL_ENTRY_REREAD 5
513 #define JOURNAL_ENTRY_NONE 6
514 #define JOURNAL_ENTRY_BAD 7
516 #define journal_entry_err(c, msg, ...) \
518 if (write == READ) { \
519 mustfix_fsck_err(c, msg, ##__VA_ARGS__); \
521 bch_err(c, "detected corrupt metadata before write:\n" \
522 msg, ##__VA_ARGS__); \
523 ret = BCH_FSCK_ERRORS_NOT_FIXED; \
529 #define journal_entry_err_on(cond, c, msg, ...) \
530 ((cond) ? journal_entry_err(c, msg, ##__VA_ARGS__) : false)
532 static int journal_entry_validate_entries(struct bch_fs *c, struct jset *jset,
535 struct jset_entry *entry;
538 vstruct_for_each(jset, entry) {
539 void *next = vstruct_next(entry);
542 if (journal_entry_err_on(vstruct_next(entry) >
543 vstruct_last(jset), c,
544 "journal entry extends past end of jset")) {
545 jset->u64s = cpu_to_le32((u64 *) entry - jset->_data);
549 switch (entry->type) {
550 case JOURNAL_ENTRY_BTREE_KEYS:
551 vstruct_for_each(entry, k) {
552 ret = journal_validate_key(c, jset, entry, k,
553 bkey_type(entry->level,
561 case JOURNAL_ENTRY_BTREE_ROOT:
564 if (journal_entry_err_on(!entry->u64s ||
565 le16_to_cpu(entry->u64s) != k->k.u64s, c,
566 "invalid btree root journal entry: wrong number of keys")) {
568 * we don't want to null out this jset_entry,
569 * just the contents, so that later we can tell
570 * we were _supposed_ to have a btree root
573 journal_entry_null_range(vstruct_next(entry), next);
577 ret = journal_validate_key(c, jset, entry, k,
578 BKEY_TYPE_BTREE, "btree root");
583 case JOURNAL_ENTRY_PRIO_PTRS:
586 case JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED:
587 if (journal_entry_err_on(le16_to_cpu(entry->u64s) != 1, c,
588 "invalid journal seq blacklist entry: bad size")) {
589 journal_entry_null_range(entry,
590 vstruct_next(entry));
595 journal_entry_err(c, "invalid journal entry type %u",
597 journal_entry_null_range(entry, vstruct_next(entry));
606 static int journal_entry_validate(struct bch_fs *c,
607 struct jset *jset, u64 sector,
608 unsigned bucket_sectors_left,
609 unsigned sectors_read,
612 size_t bytes = vstruct_bytes(jset);
613 struct bch_csum csum;
616 if (le64_to_cpu(jset->magic) != jset_magic(c))
617 return JOURNAL_ENTRY_NONE;
619 if (le32_to_cpu(jset->version) != BCACHE_JSET_VERSION) {
620 bch_err(c, "unknown journal entry version %u",
621 le32_to_cpu(jset->version));
622 return BCH_FSCK_UNKNOWN_VERSION;
625 if (journal_entry_err_on(bytes > bucket_sectors_left << 9, c,
626 "journal entry too big (%zu bytes), sector %lluu",
628 /* XXX: note we might have missing journal entries */
629 return JOURNAL_ENTRY_BAD;
632 if (bytes > sectors_read << 9)
633 return JOURNAL_ENTRY_REREAD;
635 if (fsck_err_on(!bch2_checksum_type_valid(c, JSET_CSUM_TYPE(jset)), c,
636 "journal entry with unknown csum type %llu sector %lluu",
637 JSET_CSUM_TYPE(jset), sector))
638 return JOURNAL_ENTRY_BAD;
640 csum = csum_vstruct(c, JSET_CSUM_TYPE(jset), journal_nonce(jset), jset);
641 if (journal_entry_err_on(bch2_crc_cmp(csum, jset->csum), c,
642 "journal checksum bad, sector %llu", sector)) {
643 /* XXX: retry IO, when we start retrying checksum errors */
644 /* XXX: note we might have missing journal entries */
645 return JOURNAL_ENTRY_BAD;
648 bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
649 jset->encrypted_start,
650 vstruct_end(jset) - (void *) jset->encrypted_start);
652 if (journal_entry_err_on(le64_to_cpu(jset->last_seq) > le64_to_cpu(jset->seq), c,
653 "invalid journal entry: last_seq > seq"))
654 jset->last_seq = jset->seq;
661 struct journal_read_buf {
666 static int journal_read_buf_realloc(struct journal_read_buf *b,
671 /* the bios are sized for this many pages, max: */
672 if (new_size > JOURNAL_ENTRY_SIZE_MAX)
675 new_size = roundup_pow_of_two(new_size);
676 n = kvpmalloc(new_size, GFP_KERNEL);
680 kvpfree(b->data, b->size);
686 static int journal_read_bucket(struct bch_dev *ca,
687 struct journal_read_buf *buf,
688 struct journal_list *jlist,
689 unsigned bucket, u64 *seq, bool *entries_found)
691 struct bch_fs *c = ca->fs;
692 struct journal_device *ja = &ca->journal;
693 struct bio *bio = ja->bio;
694 struct jset *j = NULL;
695 unsigned sectors, sectors_read = 0;
696 u64 offset = bucket_to_sector(ca, ja->buckets[bucket]),
697 end = offset + ca->mi.bucket_size;
698 bool saw_bad = false;
701 pr_debug("reading %u", bucket);
703 while (offset < end) {
705 reread: sectors_read = min_t(unsigned,
706 end - offset, buf->size >> 9);
709 bio_set_dev(bio, ca->disk_sb.bdev);
710 bio->bi_iter.bi_sector = offset;
711 bio->bi_iter.bi_size = sectors_read << 9;
712 bio_set_op_attrs(bio, REQ_OP_READ, 0);
713 bch2_bio_map(bio, buf->data);
715 ret = submit_bio_wait(bio);
717 if (bch2_dev_io_err_on(ret, ca,
718 "journal read from sector %llu",
720 bch2_meta_read_fault("journal"))
726 ret = journal_entry_validate(c, j, offset,
727 end - offset, sectors_read,
732 case JOURNAL_ENTRY_REREAD:
733 if (vstruct_bytes(j) > buf->size) {
734 ret = journal_read_buf_realloc(buf,
740 case JOURNAL_ENTRY_NONE:
743 sectors = c->opts.block_size;
745 case JOURNAL_ENTRY_BAD:
747 sectors = c->opts.block_size;
754 * This happens sometimes if we don't have discards on -
755 * when we've partially overwritten a bucket with new
756 * journal entries. We don't need the rest of the
759 if (le64_to_cpu(j->seq) < ja->bucket_seq[bucket])
762 ja->bucket_seq[bucket] = le64_to_cpu(j->seq);
764 mutex_lock(&jlist->lock);
765 ret = journal_entry_add(c, ca, jlist, j);
766 mutex_unlock(&jlist->lock);
769 case JOURNAL_ENTRY_ADD_OK:
770 *entries_found = true;
772 case JOURNAL_ENTRY_ADD_OUT_OF_RANGE:
778 if (le64_to_cpu(j->seq) > *seq)
779 *seq = le64_to_cpu(j->seq);
781 sectors = vstruct_sectors(j, c->block_bits);
785 sectors_read -= sectors;
786 j = ((void *) j) + (sectors << 9);
792 static void bch2_journal_read_device(struct closure *cl)
794 #define read_bucket(b) \
796 bool entries_found = false; \
797 ret = journal_read_bucket(ca, &buf, jlist, b, &seq, \
801 __set_bit(b, bitmap); \
805 struct journal_device *ja =
806 container_of(cl, struct journal_device, read);
807 struct bch_dev *ca = container_of(ja, struct bch_dev, journal);
808 struct journal_list *jlist =
809 container_of(cl->parent, struct journal_list, cl);
810 struct request_queue *q = bdev_get_queue(ca->disk_sb.bdev);
811 struct journal_read_buf buf = { NULL, 0 };
813 DECLARE_BITMAP(bitmap, ja->nr);
821 bitmap_zero(bitmap, ja->nr);
822 ret = journal_read_buf_realloc(&buf, PAGE_SIZE);
826 pr_debug("%u journal buckets", ja->nr);
829 * If the device supports discard but not secure discard, we can't do
830 * the fancy fibonacci hash/binary search because the live journal
831 * entries might not form a contiguous range:
833 for (i = 0; i < ja->nr; i++)
837 if (!blk_queue_nonrot(q))
841 * Read journal buckets ordered by golden ratio hash to quickly
842 * find a sequence of buckets with valid journal entries
844 for (i = 0; i < ja->nr; i++) {
845 l = (i * 2654435769U) % ja->nr;
847 if (test_bit(l, bitmap))
855 * If that fails, check all the buckets we haven't checked
858 pr_debug("falling back to linear search");
860 for (l = find_first_zero_bit(bitmap, ja->nr);
862 l = find_next_zero_bit(bitmap, ja->nr, l + 1))
866 /* no journal entries on this device? */
871 r = find_next_bit(bitmap, ja->nr, l + 1);
872 pr_debug("starting binary search, l %u r %u", l, r);
875 unsigned m = (l + r) >> 1;
888 * Find the journal bucket with the highest sequence number:
890 * If there's duplicate journal entries in multiple buckets (which
891 * definitely isn't supposed to happen, but...) - make sure to start
892 * cur_idx at the last of those buckets, so we don't deadlock trying to
897 for (i = 0; i < ja->nr; i++)
898 if (ja->bucket_seq[i] >= seq &&
899 ja->bucket_seq[i] != ja->bucket_seq[(i + 1) % ja->nr]) {
901 * When journal_next_bucket() goes to allocate for
902 * the first time, it'll use the bucket after
906 seq = ja->bucket_seq[i];
910 * Set last_idx to indicate the entire journal is full and needs to be
911 * reclaimed - journal reclaim will immediately reclaim whatever isn't
912 * pinned when it first runs:
914 ja->last_idx = (ja->cur_idx + 1) % ja->nr;
917 * Read buckets in reverse order until we stop finding more journal
920 for (i = (ja->cur_idx + ja->nr - 1) % ja->nr;
922 i = (i + ja->nr - 1) % ja->nr)
923 if (!test_bit(i, bitmap) &&
927 kvpfree(buf.data, buf.size);
928 percpu_ref_put(&ca->io_ref);
931 mutex_lock(&jlist->lock);
933 mutex_unlock(&jlist->lock);
938 void bch2_journal_entries_free(struct list_head *list)
941 while (!list_empty(list)) {
942 struct journal_replay *i =
943 list_first_entry(list, struct journal_replay, list);
945 kvpfree(i, offsetof(struct journal_replay, j) +
946 vstruct_bytes(&i->j));
950 static int journal_seq_blacklist_read(struct journal *j,
951 struct journal_replay *i,
952 struct journal_entry_pin_list *p)
954 struct bch_fs *c = container_of(j, struct bch_fs, journal);
955 struct jset_entry *entry;
956 struct journal_seq_blacklist *bl;
959 for_each_jset_entry_type(entry, &i->j,
960 JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED) {
961 struct jset_entry_blacklist *bl_entry =
962 container_of(entry, struct jset_entry_blacklist, entry);
963 seq = le64_to_cpu(bl_entry->seq);
965 bch_verbose(c, "blacklisting existing journal seq %llu", seq);
967 bl = bch2_journal_seq_blacklisted_new(j, seq);
971 journal_pin_add_entry(j, p, &bl->pin,
972 journal_seq_blacklist_flush);
979 static inline bool journal_has_keys(struct list_head *list)
981 struct journal_replay *i;
982 struct jset_entry *entry;
983 struct bkey_i *k, *_n;
985 list_for_each_entry(i, list, list)
986 for_each_jset_key(k, _n, entry, &i->j)
992 int bch2_journal_read(struct bch_fs *c, struct list_head *list)
994 struct journal *j = &c->journal;
995 struct journal_list jlist;
996 struct journal_replay *i;
997 struct journal_entry_pin_list *p;
999 u64 cur_seq, end_seq, seq;
1000 unsigned iter, keys = 0, entries = 0;
1002 bool degraded = false;
1005 closure_init_stack(&jlist.cl);
1006 mutex_init(&jlist.lock);
1010 for_each_member_device(ca, c, iter) {
1011 if (!(bch2_dev_has_data(c, ca) & (1 << BCH_DATA_JOURNAL)))
1014 if ((ca->mi.state == BCH_MEMBER_STATE_RW ||
1015 ca->mi.state == BCH_MEMBER_STATE_RO) &&
1016 percpu_ref_tryget(&ca->io_ref))
1017 closure_call(&ca->journal.read,
1018 bch2_journal_read_device,
1025 closure_sync(&jlist.cl);
1030 if (list_empty(list)){
1031 bch_err(c, "no journal entries found");
1032 return BCH_FSCK_REPAIR_IMPOSSIBLE;
1035 fsck_err_on(c->sb.clean && journal_has_keys(list), c,
1036 "filesystem marked clean but journal has keys to replay");
1038 list_for_each_entry(i, list, list) {
1039 ret = journal_entry_validate_entries(c, &i->j, READ);
1044 * If we're mounting in degraded mode - if we didn't read all
1045 * the devices - this is wrong:
1049 (test_bit(BCH_FS_REBUILD_REPLICAS, &c->flags) ||
1050 fsck_err_on(!bch2_replicas_marked(c, BCH_DATA_JOURNAL,
1052 "superblock not marked as containing replicas (type %u)",
1053 BCH_DATA_JOURNAL))) {
1054 ret = bch2_mark_replicas(c, BCH_DATA_JOURNAL, i->devs);
1060 i = list_last_entry(list, struct journal_replay, list);
1062 nr = le64_to_cpu(i->j.seq) - le64_to_cpu(i->j.last_seq) + 1;
1064 if (nr > j->pin.size) {
1066 init_fifo(&j->pin, roundup_pow_of_two(nr), GFP_KERNEL);
1068 bch_err(c, "error reallocating journal fifo (%zu open entries)", nr);
1073 atomic64_set(&j->seq, le64_to_cpu(i->j.seq));
1074 j->last_seq_ondisk = le64_to_cpu(i->j.last_seq);
1076 j->pin.front = le64_to_cpu(i->j.last_seq);
1077 j->pin.back = le64_to_cpu(i->j.seq) + 1;
1079 fifo_for_each_entry_ptr(p, &j->pin, seq) {
1080 INIT_LIST_HEAD(&p->list);
1081 INIT_LIST_HEAD(&p->flushed);
1082 atomic_set(&p->count, 0);
1086 mutex_lock(&j->blacklist_lock);
1088 list_for_each_entry(i, list, list) {
1089 p = journal_seq_pin(j, le64_to_cpu(i->j.seq));
1091 atomic_set(&p->count, 1);
1094 if (journal_seq_blacklist_read(j, i, p)) {
1095 mutex_unlock(&j->blacklist_lock);
1100 mutex_unlock(&j->blacklist_lock);
1102 cur_seq = journal_last_seq(j);
1103 end_seq = le64_to_cpu(list_last_entry(list,
1104 struct journal_replay, list)->j.seq);
1106 list_for_each_entry(i, list, list) {
1107 struct jset_entry *entry;
1108 struct bkey_i *k, *_n;
1111 mutex_lock(&j->blacklist_lock);
1112 while (cur_seq < le64_to_cpu(i->j.seq) &&
1113 journal_seq_blacklist_find(j, cur_seq))
1116 blacklisted = journal_seq_blacklist_find(j,
1117 le64_to_cpu(i->j.seq));
1118 mutex_unlock(&j->blacklist_lock);
1120 fsck_err_on(blacklisted, c,
1121 "found blacklisted journal entry %llu",
1122 le64_to_cpu(i->j.seq));
1124 fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c,
1125 "journal entries %llu-%llu missing! (replaying %llu-%llu)",
1126 cur_seq, le64_to_cpu(i->j.seq) - 1,
1127 journal_last_seq(j), end_seq);
1129 cur_seq = le64_to_cpu(i->j.seq) + 1;
1131 for_each_jset_key(k, _n, entry, &i->j)
1136 bch_info(c, "journal read done, %i keys in %i entries, seq %llu",
1137 keys, entries, journal_cur_seq(j));
1142 int bch2_journal_mark(struct bch_fs *c, struct list_head *list)
1144 struct bkey_i *k, *n;
1145 struct jset_entry *j;
1146 struct journal_replay *r;
1149 list_for_each_entry(r, list, list)
1150 for_each_jset_key(k, n, j, &r->j) {
1151 enum bkey_type type = bkey_type(j->level, j->btree_id);
1152 struct bkey_s_c k_s_c = bkey_i_to_s_c(k);
1154 if (btree_type_has_ptrs(type)) {
1155 ret = bch2_btree_mark_key_initial(c, type, k_s_c);
1164 static bool journal_entry_is_open(struct journal *j)
1166 return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
1169 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
1171 struct journal_buf *w = journal_prev_buf(j);
1173 atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
1175 if (!need_write_just_set &&
1176 test_bit(JOURNAL_NEED_WRITE, &j->flags))
1177 __bch2_time_stats_update(j->delay_time,
1178 j->need_write_time);
1180 closure_call(&j->io, journal_write, NULL, NULL);
1182 /* Shut sparse up: */
1183 closure_init(&j->io, NULL);
1184 set_closure_fn(&j->io, journal_write, NULL);
1185 journal_write(&j->io);
1189 static void journal_pin_new_entry(struct journal *j, int count)
1191 struct journal_entry_pin_list *p;
1194 * The fifo_push() needs to happen at the same time as j->seq is
1195 * incremented for journal_last_seq() to be calculated correctly
1197 atomic64_inc(&j->seq);
1198 p = fifo_push_ref(&j->pin);
1200 INIT_LIST_HEAD(&p->list);
1201 INIT_LIST_HEAD(&p->flushed);
1202 atomic_set(&p->count, count);
1206 static void bch2_journal_buf_init(struct journal *j)
1208 struct journal_buf *buf = journal_cur_buf(j);
1210 memset(buf->has_inode, 0, sizeof(buf->has_inode));
1212 memset(buf->data, 0, sizeof(*buf->data));
1213 buf->data->seq = cpu_to_le64(journal_cur_seq(j));
1214 buf->data->u64s = 0;
1217 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
1219 return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1223 JOURNAL_ENTRY_ERROR,
1224 JOURNAL_ENTRY_INUSE,
1225 JOURNAL_ENTRY_CLOSED,
1227 } journal_buf_switch(struct journal *j, bool need_write_just_set)
1229 struct bch_fs *c = container_of(j, struct bch_fs, journal);
1230 struct journal_buf *buf;
1231 union journal_res_state old, new;
1232 u64 v = atomic64_read(&j->reservations.counter);
1234 lockdep_assert_held(&j->lock);
1238 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
1239 return JOURNAL_ENTRY_CLOSED;
1241 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1242 return JOURNAL_ENTRY_ERROR;
1244 if (new.prev_buf_unwritten)
1245 return JOURNAL_ENTRY_INUSE;
1248 * avoid race between setting buf->data->u64s and
1249 * journal_res_put starting write:
1251 journal_state_inc(&new);
1253 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
1255 new.prev_buf_unwritten = 1;
1257 BUG_ON(journal_state_count(new, new.idx));
1258 } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1259 old.v, new.v)) != old.v);
1261 clear_bit(JOURNAL_NEED_WRITE, &j->flags);
1263 buf = &j->buf[old.idx];
1264 buf->data->u64s = cpu_to_le32(old.cur_entry_offset);
1266 j->prev_buf_sectors =
1267 vstruct_blocks_plus(buf->data, c->block_bits,
1268 journal_entry_u64s_reserve(buf)) *
1270 BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
1272 journal_reclaim_fast(j);
1273 /* XXX: why set this here, and not in journal_write()? */
1274 buf->data->last_seq = cpu_to_le64(journal_last_seq(j));
1276 journal_pin_new_entry(j, 1);
1278 bch2_journal_buf_init(j);
1280 cancel_delayed_work(&j->write_work);
1281 spin_unlock(&j->lock);
1283 if (c->bucket_journal_seq > 1 << 14) {
1284 c->bucket_journal_seq = 0;
1285 bch2_bucket_seq_cleanup(c);
1288 /* ugh - might be called from __journal_res_get() under wait_event() */
1289 __set_current_state(TASK_RUNNING);
1290 bch2_journal_buf_put(j, old.idx, need_write_just_set);
1292 return JOURNAL_UNLOCKED;
1295 void bch2_journal_halt(struct journal *j)
1297 union journal_res_state old, new;
1298 u64 v = atomic64_read(&j->reservations.counter);
1302 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1305 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
1306 } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1307 old.v, new.v)) != old.v);
1310 closure_wake_up(&journal_cur_buf(j)->wait);
1311 closure_wake_up(&journal_prev_buf(j)->wait);
1314 static unsigned journal_dev_buckets_available(struct journal *j,
1317 struct journal_device *ja = &ca->journal;
1318 unsigned next = (ja->cur_idx + 1) % ja->nr;
1319 unsigned available = (ja->last_idx + ja->nr - next) % ja->nr;
1322 * Hack to avoid a deadlock during journal replay:
1323 * journal replay might require setting a new btree
1324 * root, which requires writing another journal entry -
1325 * thus, if the journal is full (and this happens when
1326 * replaying the first journal bucket's entries) we're
1329 * So don't let the journal fill up unless we're in
1332 if (test_bit(JOURNAL_REPLAY_DONE, &j->flags))
1333 available = max((int) available - 2, 0);
1336 * Don't use the last bucket unless writing the new last_seq
1337 * will make another bucket available:
1339 if (ja->bucket_seq[ja->last_idx] >= journal_last_seq(j))
1340 available = max((int) available - 1, 0);
1345 /* returns number of sectors available for next journal entry: */
1346 static int journal_entry_sectors(struct journal *j)
1348 struct bch_fs *c = container_of(j, struct bch_fs, journal);
1350 struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
1351 unsigned sectors_available = UINT_MAX;
1352 unsigned i, nr_online = 0, nr_devs = 0;
1354 lockdep_assert_held(&j->lock);
1357 for_each_member_device_rcu(ca, c, i,
1358 &c->rw_devs[BCH_DATA_JOURNAL]) {
1359 struct journal_device *ja = &ca->journal;
1360 unsigned buckets_required = 0;
1365 sectors_available = min_t(unsigned, sectors_available,
1366 ca->mi.bucket_size);
1369 * Note that we don't allocate the space for a journal entry
1370 * until we write it out - thus, if we haven't started the write
1371 * for the previous entry we have to make sure we have space for
1374 if (bch2_extent_has_device(e.c, ca->dev_idx)) {
1375 if (j->prev_buf_sectors > ja->sectors_free)
1378 if (j->prev_buf_sectors + sectors_available >
1382 if (j->prev_buf_sectors + sectors_available >
1389 if (journal_dev_buckets_available(j, ca) >= buckets_required)
1395 if (nr_online < c->opts.metadata_replicas_required)
1398 if (nr_devs < min_t(unsigned, nr_online, c->opts.metadata_replicas))
1401 return sectors_available;
1405 * should _only_ called from journal_res_get() - when we actually want a
1406 * journal reservation - journal entry is open means journal is dirty:
1410 * 0: journal currently full (must wait)
1411 * -EROFS: insufficient rw devices
1412 * -EIO: journal error
1414 static int journal_entry_open(struct journal *j)
1416 struct journal_buf *buf = journal_cur_buf(j);
1417 union journal_res_state old, new;
1422 lockdep_assert_held(&j->lock);
1423 BUG_ON(journal_entry_is_open(j));
1425 if (!fifo_free(&j->pin))
1428 sectors = journal_entry_sectors(j);
1432 buf->disk_sectors = sectors;
1434 sectors = min_t(unsigned, sectors, buf->size >> 9);
1435 j->cur_buf_sectors = sectors;
1437 u64s = (sectors << 9) / sizeof(u64);
1439 /* Subtract the journal header */
1440 u64s -= sizeof(struct jset) / sizeof(u64);
1442 * Btree roots, prio pointers don't get added until right before we do
1445 u64s -= journal_entry_u64s_reserve(buf);
1446 u64s = max_t(ssize_t, 0L, u64s);
1448 BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
1450 if (u64s <= le32_to_cpu(buf->data->u64s))
1454 * Must be set before marking the journal entry as open:
1456 j->cur_entry_u64s = u64s;
1458 v = atomic64_read(&j->reservations.counter);
1462 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1465 /* Handle any already added entries */
1466 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
1467 } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1468 old.v, new.v)) != old.v);
1470 if (j->res_get_blocked_start)
1471 __bch2_time_stats_update(j->blocked_time,
1472 j->res_get_blocked_start);
1473 j->res_get_blocked_start = 0;
1475 mod_delayed_work(system_freezable_wq,
1477 msecs_to_jiffies(j->write_delay_ms));
1482 void bch2_journal_start(struct bch_fs *c)
1484 struct journal *j = &c->journal;
1485 struct journal_seq_blacklist *bl;
1488 list_for_each_entry(bl, &j->seq_blacklist, list)
1489 new_seq = max(new_seq, bl->seq);
1491 spin_lock(&j->lock);
1493 set_bit(JOURNAL_STARTED, &j->flags);
1495 while (journal_cur_seq(j) < new_seq)
1496 journal_pin_new_entry(j, 0);
1499 * journal_buf_switch() only inits the next journal entry when it
1500 * closes an open journal entry - the very first journal entry gets
1503 journal_pin_new_entry(j, 1);
1504 bch2_journal_buf_init(j);
1506 spin_unlock(&j->lock);
1509 * Adding entries to the next journal entry before allocating space on
1510 * disk for the next journal entry - this is ok, because these entries
1511 * only have to go down with the next journal entry we write:
1513 list_for_each_entry(bl, &j->seq_blacklist, list)
1515 bch2_journal_add_entry_noreservation(journal_cur_buf(j),
1516 JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED,
1519 journal_pin_add_entry(j,
1520 &fifo_peek_back(&j->pin),
1522 journal_seq_blacklist_flush);
1526 queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
1529 int bch2_journal_replay(struct bch_fs *c, struct list_head *list)
1531 struct journal *j = &c->journal;
1532 struct bkey_i *k, *_n;
1533 struct jset_entry *entry;
1534 struct journal_replay *i, *n;
1537 list_for_each_entry_safe(i, n, list, list) {
1538 j->replay_pin_list =
1539 journal_seq_pin(j, le64_to_cpu(i->j.seq));
1541 for_each_jset_key(k, _n, entry, &i->j) {
1543 if (entry->btree_id == BTREE_ID_ALLOC) {
1545 * allocation code handles replay for
1546 * BTREE_ID_ALLOC keys:
1548 ret = bch2_alloc_replay_key(c, k->k.p);
1551 * We might cause compressed extents to be
1552 * split, so we need to pass in a
1555 struct disk_reservation disk_res =
1556 bch2_disk_reservation_init(c, 0);
1558 ret = bch2_btree_insert(c, entry->btree_id, k,
1559 &disk_res, NULL, NULL,
1560 BTREE_INSERT_NOFAIL|
1561 BTREE_INSERT_JOURNAL_REPLAY);
1565 bch_err(c, "journal replay: error %d while replaying key",
1573 if (atomic_dec_and_test(&j->replay_pin_list->count))
1577 j->replay_pin_list = NULL;
1579 bch2_journal_set_replay_done(j);
1580 ret = bch2_journal_flush_all_pins(j);
1582 bch2_journal_entries_free(list);
1586 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
1587 bool new_fs, struct closure *cl)
1589 struct bch_fs *c = ca->fs;
1590 struct journal_device *ja = &ca->journal;
1591 struct bch_sb_field_journal *journal_buckets;
1592 u64 *new_bucket_seq = NULL, *new_buckets = NULL;
1595 /* don't handle reducing nr of buckets yet: */
1600 new_buckets = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1601 new_bucket_seq = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1602 if (!new_buckets || !new_bucket_seq)
1605 journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
1606 nr + sizeof(*journal_buckets) / sizeof(u64));
1607 if (!journal_buckets)
1611 spin_lock(&c->journal.lock);
1613 memcpy(new_buckets, ja->buckets, ja->nr * sizeof(u64));
1614 memcpy(new_bucket_seq, ja->bucket_seq, ja->nr * sizeof(u64));
1615 swap(new_buckets, ja->buckets);
1616 swap(new_bucket_seq, ja->bucket_seq);
1619 spin_unlock(&c->journal.lock);
1621 while (ja->nr < nr) {
1622 struct open_bucket *ob = NULL;
1626 bucket = bch2_bucket_alloc_new_fs(ca);
1632 int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl);
1634 ret = cl ? -EAGAIN : -ENOSPC;
1638 ob = c->open_buckets + ob_idx;
1639 bucket = sector_to_bucket(ca, ob->ptr.offset);
1643 spin_lock(&c->journal.lock);
1645 __array_insert_item(ja->buckets, ja->nr, ja->last_idx);
1646 __array_insert_item(ja->bucket_seq, ja->nr, ja->last_idx);
1647 __array_insert_item(journal_buckets->buckets, ja->nr, ja->last_idx);
1649 ja->buckets[ja->last_idx] = bucket;
1650 ja->bucket_seq[ja->last_idx] = 0;
1651 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket);
1653 if (ja->last_idx < ja->nr) {
1654 if (ja->cur_idx >= ja->last_idx)
1661 spin_unlock(&c->journal.lock);
1663 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
1665 gc_phase(GC_PHASE_SB),
1667 ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE
1671 bch2_open_bucket_put(c, ob);
1676 kfree(new_bucket_seq);
1683 * Allocate more journal space at runtime - not currently making use if it, but
1686 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
1689 struct journal_device *ja = &ca->journal;
1691 unsigned current_nr;
1694 closure_init_stack(&cl);
1697 struct disk_reservation disk_res = { 0, 0 };
1701 mutex_lock(&c->sb_lock);
1702 current_nr = ja->nr;
1705 * note: journal buckets aren't really counted as _sectors_ used yet, so
1706 * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
1707 * when space used goes up without a reservation - but we do need the
1708 * reservation to ensure we'll actually be able to allocate:
1711 if (bch2_disk_reservation_get(c, &disk_res,
1712 bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
1713 mutex_unlock(&c->sb_lock);
1717 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
1719 bch2_disk_reservation_put(c, &disk_res);
1721 if (ja->nr != current_nr)
1722 bch2_write_super(c);
1723 mutex_unlock(&c->sb_lock);
1724 } while (ret == -EAGAIN);
1729 int bch2_dev_journal_alloc(struct bch_dev *ca)
1733 if (dynamic_fault("bcachefs:add:journal_alloc"))
1737 * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
1740 nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
1741 BCH_JOURNAL_BUCKETS_MIN,
1743 (1 << 20) / ca->mi.bucket_size));
1745 return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
1751 * journal_reclaim_fast - do the fast part of journal reclaim
1753 * Called from IO submission context, does not block. Cleans up after btree
1754 * write completions by advancing the journal pin and each cache's last_idx,
1755 * kicking off discards and background reclaim as necessary.
1757 static void journal_reclaim_fast(struct journal *j)
1759 struct journal_entry_pin_list temp;
1760 bool popped = false;
1762 lockdep_assert_held(&j->lock);
1765 * Unpin journal entries whose reference counts reached zero, meaning
1766 * all btree nodes got written out
1768 while (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1769 BUG_ON(!list_empty(&fifo_peek_front(&j->pin).list));
1770 BUG_ON(!fifo_pop(&j->pin, temp));
1779 * Journal entry pinning - machinery for holding a reference on a given journal
1780 * entry, marking it as dirty:
1783 static inline void __journal_pin_add(struct journal *j,
1784 struct journal_entry_pin_list *pin_list,
1785 struct journal_entry_pin *pin,
1786 journal_pin_flush_fn flush_fn)
1788 BUG_ON(journal_pin_active(pin));
1789 BUG_ON(!atomic_read(&pin_list->count));
1791 atomic_inc(&pin_list->count);
1792 pin->pin_list = pin_list;
1793 pin->flush = flush_fn;
1796 list_add(&pin->list, &pin_list->list);
1798 INIT_LIST_HEAD(&pin->list);
1801 * If the journal is currently full, we might want to call flush_fn
1807 static void journal_pin_add_entry(struct journal *j,
1808 struct journal_entry_pin_list *pin_list,
1809 struct journal_entry_pin *pin,
1810 journal_pin_flush_fn flush_fn)
1812 spin_lock(&j->lock);
1813 __journal_pin_add(j, pin_list, pin, flush_fn);
1814 spin_unlock(&j->lock);
1817 void bch2_journal_pin_add(struct journal *j,
1818 struct journal_res *res,
1819 struct journal_entry_pin *pin,
1820 journal_pin_flush_fn flush_fn)
1822 struct journal_entry_pin_list *pin_list = res->ref
1823 ? journal_seq_pin(j, res->seq)
1824 : j->replay_pin_list;
1826 spin_lock(&j->lock);
1827 __journal_pin_add(j, pin_list, pin, flush_fn);
1828 spin_unlock(&j->lock);
1831 static inline void __journal_pin_drop(struct journal *j,
1832 struct journal_entry_pin *pin)
1834 struct journal_entry_pin_list *pin_list = pin->pin_list;
1836 if (!journal_pin_active(pin))
1839 pin->pin_list = NULL;
1840 list_del_init(&pin->list);
1843 * Unpinning a journal entry make make journal_next_bucket() succeed, if
1844 * writing a new last_seq will now make another bucket available:
1846 if (atomic_dec_and_test(&pin_list->count) &&
1847 pin_list == &fifo_peek_front(&j->pin))
1848 journal_reclaim_fast(j);
1851 void bch2_journal_pin_drop(struct journal *j,
1852 struct journal_entry_pin *pin)
1854 spin_lock(&j->lock);
1855 __journal_pin_drop(j, pin);
1856 spin_unlock(&j->lock);
1859 void bch2_journal_pin_add_if_older(struct journal *j,
1860 struct journal_entry_pin *src_pin,
1861 struct journal_entry_pin *pin,
1862 journal_pin_flush_fn flush_fn)
1864 spin_lock(&j->lock);
1866 if (journal_pin_active(src_pin) &&
1867 (!journal_pin_active(pin) ||
1868 journal_pin_seq(j, src_pin->pin_list) <
1869 journal_pin_seq(j, pin->pin_list))) {
1870 __journal_pin_drop(j, pin);
1871 __journal_pin_add(j, src_pin->pin_list, pin, flush_fn);
1874 spin_unlock(&j->lock);
1877 static struct journal_entry_pin *
1878 __journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
1880 struct journal_entry_pin_list *pin_list;
1881 struct journal_entry_pin *ret;
1884 /* no need to iterate over empty fifo entries: */
1885 journal_reclaim_fast(j);
1887 fifo_for_each_entry_ptr(pin_list, &j->pin, iter) {
1888 if (iter > seq_to_flush)
1891 ret = list_first_entry_or_null(&pin_list->list,
1892 struct journal_entry_pin, list);
1894 /* must be list_del_init(), see bch2_journal_pin_drop() */
1895 list_move(&ret->list, &pin_list->flushed);
1904 static struct journal_entry_pin *
1905 journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
1907 struct journal_entry_pin *ret;
1909 spin_lock(&j->lock);
1910 ret = __journal_get_next_pin(j, seq_to_flush, seq);
1911 spin_unlock(&j->lock);
1916 static int journal_flush_done(struct journal *j, u64 seq_to_flush,
1917 struct journal_entry_pin **pin,
1924 ret = bch2_journal_error(j);
1928 spin_lock(&j->lock);
1930 * If journal replay hasn't completed, the unreplayed journal entries
1931 * hold refs on their corresponding sequence numbers
1933 ret = (*pin = __journal_get_next_pin(j, seq_to_flush, pin_seq)) != NULL ||
1934 !test_bit(JOURNAL_REPLAY_DONE, &j->flags) ||
1935 journal_last_seq(j) > seq_to_flush ||
1936 (fifo_used(&j->pin) == 1 &&
1937 atomic_read(&fifo_peek_front(&j->pin).count) == 1);
1938 spin_unlock(&j->lock);
1943 int bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
1945 struct bch_fs *c = container_of(j, struct bch_fs, journal);
1946 struct journal_entry_pin *pin;
1950 if (!test_bit(JOURNAL_STARTED, &j->flags))
1953 wait_event(j->wait, journal_flush_done(j, seq_to_flush, &pin, &pin_seq));
1955 /* flushing a journal pin might cause a new one to be added: */
1956 pin->flush(j, pin, pin_seq);
1960 spin_lock(&j->lock);
1961 flush = journal_last_seq(j) != j->last_seq_ondisk ||
1962 (seq_to_flush == U64_MAX && c->btree_roots_dirty);
1963 spin_unlock(&j->lock);
1965 return flush ? bch2_journal_meta(j) : 0;
1968 int bch2_journal_flush_all_pins(struct journal *j)
1970 return bch2_journal_flush_pins(j, U64_MAX);
1973 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
1977 spin_lock(&j->lock);
1979 (ja->last_idx != ja->cur_idx &&
1980 ja->bucket_seq[ja->last_idx] < j->last_seq_ondisk);
1981 spin_unlock(&j->lock);
1987 * journal_reclaim_work - free up journal buckets
1989 * Background journal reclaim writes out btree nodes. It should be run
1990 * early enough so that we never completely run out of journal buckets.
1992 * High watermarks for triggering background reclaim:
1993 * - FIFO has fewer than 512 entries left
1994 * - fewer than 25% journal buckets free
1996 * Background reclaim runs until low watermarks are reached:
1997 * - FIFO has more than 1024 entries left
1998 * - more than 50% journal buckets free
2000 * As long as a reclaim can complete in the time it takes to fill up
2001 * 512 journal entries or 25% of all journal buckets, then
2002 * journal_next_bucket() should not stall.
2004 static void journal_reclaim_work(struct work_struct *work)
2006 struct bch_fs *c = container_of(to_delayed_work(work),
2007 struct bch_fs, journal.reclaim_work);
2008 struct journal *j = &c->journal;
2010 struct journal_entry_pin *pin;
2011 u64 seq, seq_to_flush = 0;
2012 unsigned iter, bucket_to_flush;
2013 unsigned long next_flush;
2014 bool reclaim_lock_held = false, need_flush;
2017 * Advance last_idx to point to the oldest journal entry containing
2018 * btree node updates that have not yet been written out
2020 for_each_rw_member(ca, c, iter) {
2021 struct journal_device *ja = &ca->journal;
2026 while (should_discard_bucket(j, ja)) {
2027 if (!reclaim_lock_held) {
2030 * might be called from __journal_res_get()
2031 * under wait_event() - have to go back to
2032 * TASK_RUNNING before doing something that
2033 * would block, but only if we're doing work:
2035 __set_current_state(TASK_RUNNING);
2037 mutex_lock(&j->reclaim_lock);
2038 reclaim_lock_held = true;
2039 /* recheck under reclaim_lock: */
2043 if (ca->mi.discard &&
2044 blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
2045 blkdev_issue_discard(ca->disk_sb.bdev,
2046 bucket_to_sector(ca,
2047 ja->buckets[ja->last_idx]),
2048 ca->mi.bucket_size, GFP_NOIO, 0);
2050 spin_lock(&j->lock);
2051 ja->last_idx = (ja->last_idx + 1) % ja->nr;
2052 spin_unlock(&j->lock);
2058 * Write out enough btree nodes to free up 50% journal
2061 spin_lock(&j->lock);
2062 bucket_to_flush = (ja->cur_idx + (ja->nr >> 1)) % ja->nr;
2063 seq_to_flush = max_t(u64, seq_to_flush,
2064 ja->bucket_seq[bucket_to_flush]);
2065 spin_unlock(&j->lock);
2068 if (reclaim_lock_held)
2069 mutex_unlock(&j->reclaim_lock);
2071 /* Also flush if the pin fifo is more than half full */
2072 spin_lock(&j->lock);
2073 seq_to_flush = max_t(s64, seq_to_flush,
2074 (s64) journal_cur_seq(j) -
2075 (j->pin.size >> 1));
2076 spin_unlock(&j->lock);
2079 * If it's been longer than j->reclaim_delay_ms since we last flushed,
2080 * make sure to flush at least one journal pin:
2082 next_flush = j->last_flushed + msecs_to_jiffies(j->reclaim_delay_ms);
2083 need_flush = time_after(jiffies, next_flush);
2085 while ((pin = journal_get_next_pin(j, need_flush
2087 : seq_to_flush, &seq))) {
2088 __set_current_state(TASK_RUNNING);
2089 pin->flush(j, pin, seq);
2092 j->last_flushed = jiffies;
2095 if (!test_bit(BCH_FS_RO, &c->flags))
2096 queue_delayed_work(system_freezable_wq, &j->reclaim_work,
2097 msecs_to_jiffies(j->reclaim_delay_ms));
2101 * journal_next_bucket - move on to the next journal bucket if possible
2103 static int journal_write_alloc(struct journal *j, struct journal_buf *w,
2106 struct bch_fs *c = container_of(j, struct bch_fs, journal);
2107 struct bkey_s_extent e;
2108 struct bch_extent_ptr *ptr;
2109 struct journal_device *ja;
2111 struct dev_alloc_list devs_sorted;
2112 unsigned i, replicas, replicas_want =
2113 READ_ONCE(c->opts.metadata_replicas);
2115 spin_lock(&j->lock);
2116 e = bkey_i_to_s_extent(&j->key);
2119 * Drop any pointers to devices that have been removed, are no longer
2120 * empty, or filled up their current journal bucket:
2122 * Note that a device may have had a small amount of free space (perhaps
2123 * one sector) that wasn't enough for the smallest possible journal
2124 * entry - that's why we drop pointers to devices <= current free space,
2125 * i.e. whichever device was limiting the current journal entry size.
2127 extent_for_each_ptr_backwards(e, ptr) {
2128 ca = bch_dev_bkey_exists(c, ptr->dev);
2130 if (ca->mi.state != BCH_MEMBER_STATE_RW ||
2131 ca->journal.sectors_free <= sectors)
2132 __bch2_extent_drop_ptr(e, ptr);
2134 ca->journal.sectors_free -= sectors;
2137 replicas = bch2_extent_nr_ptrs(e.c);
2140 devs_sorted = bch2_wp_alloc_list(c, &j->wp,
2141 &c->rw_devs[BCH_DATA_JOURNAL]);
2143 for (i = 0; i < devs_sorted.nr; i++) {
2144 ca = rcu_dereference(c->devs[devs_sorted.devs[i]]);
2148 if (!ca->mi.durability)
2155 if (replicas >= replicas_want)
2159 * Check that we can use this device, and aren't already using
2162 if (bch2_extent_has_device(e.c, ca->dev_idx) ||
2163 !journal_dev_buckets_available(j, ca) ||
2164 sectors > ca->mi.bucket_size)
2167 j->wp.next_alloc[ca->dev_idx] += U32_MAX;
2168 bch2_wp_rescale(c, ca, &j->wp);
2170 ja->sectors_free = ca->mi.bucket_size - sectors;
2171 ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
2172 ja->bucket_seq[ja->cur_idx] = le64_to_cpu(w->data->seq);
2174 extent_ptr_append(bkey_i_to_extent(&j->key),
2175 (struct bch_extent_ptr) {
2176 .offset = bucket_to_sector(ca,
2177 ja->buckets[ja->cur_idx]),
2181 replicas += ca->mi.durability;
2185 j->prev_buf_sectors = 0;
2187 bkey_copy(&w->key, &j->key);
2188 spin_unlock(&j->lock);
2190 if (replicas < c->opts.metadata_replicas_required)
2198 static void journal_write_compact(struct jset *jset)
2200 struct jset_entry *i, *next, *prev = NULL;
2203 * Simple compaction, dropping empty jset_entries (from journal
2204 * reservations that weren't fully used) and merging jset_entries that
2207 * If we wanted to be really fancy here, we could sort all the keys in
2208 * the jset and drop keys that were overwritten - probably not worth it:
2210 vstruct_for_each_safe(jset, i, next) {
2211 unsigned u64s = le16_to_cpu(i->u64s);
2217 /* Can we merge with previous entry? */
2219 i->btree_id == prev->btree_id &&
2220 i->level == prev->level &&
2221 i->type == prev->type &&
2222 i->type == JOURNAL_ENTRY_BTREE_KEYS &&
2223 le16_to_cpu(prev->u64s) + u64s <= U16_MAX) {
2224 memmove_u64s_down(vstruct_next(prev),
2227 le16_add_cpu(&prev->u64s, u64s);
2231 /* Couldn't merge, move i into new position (after prev): */
2232 prev = prev ? vstruct_next(prev) : jset->start;
2234 memmove_u64s_down(prev, i, jset_u64s(u64s));
2237 prev = prev ? vstruct_next(prev) : jset->start;
2238 jset->u64s = cpu_to_le32((u64 *) prev - jset->_data);
2241 static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
2243 /* we aren't holding j->lock: */
2244 unsigned new_size = READ_ONCE(j->buf_size_want);
2247 if (buf->size >= new_size)
2250 new_buf = kvpmalloc(new_size, GFP_NOIO|__GFP_NOWARN);
2254 memcpy(new_buf, buf->data, buf->size);
2255 kvpfree(buf->data, buf->size);
2256 buf->data = new_buf;
2257 buf->size = new_size;
2260 static void journal_write_done(struct closure *cl)
2262 struct journal *j = container_of(cl, struct journal, io);
2263 struct bch_fs *c = container_of(j, struct bch_fs, journal);
2264 struct journal_buf *w = journal_prev_buf(j);
2265 struct bch_devs_list devs =
2266 bch2_extent_devs(bkey_i_to_s_c_extent(&w->key));
2269 bch_err(c, "unable to write journal to sufficient devices");
2273 if (bch2_mark_replicas(c, BCH_DATA_JOURNAL, devs))
2276 __bch2_time_stats_update(j->write_time, j->write_start_time);
2278 spin_lock(&j->lock);
2279 j->last_seq_ondisk = le64_to_cpu(w->data->last_seq);
2281 journal_seq_pin(j, le64_to_cpu(w->data->seq))->devs = devs;
2284 * Updating last_seq_ondisk may let journal_reclaim_work() discard more
2287 * Must come before signaling write completion, for
2288 * bch2_fs_journal_stop():
2290 mod_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
2292 /* also must come before signalling write completion: */
2293 closure_debug_destroy(cl);
2295 BUG_ON(!j->reservations.prev_buf_unwritten);
2296 atomic64_sub(((union journal_res_state) { .prev_buf_unwritten = 1 }).v,
2297 &j->reservations.counter);
2299 closure_wake_up(&w->wait);
2302 if (test_bit(JOURNAL_NEED_WRITE, &j->flags))
2303 mod_delayed_work(system_freezable_wq, &j->write_work, 0);
2304 spin_unlock(&j->lock);
2307 bch2_fatal_error(c);
2308 bch2_journal_halt(j);
2312 static void journal_write_endio(struct bio *bio)
2314 struct bch_dev *ca = bio->bi_private;
2315 struct journal *j = &ca->fs->journal;
2317 if (bch2_dev_io_err_on(bio->bi_status, ca, "journal write") ||
2318 bch2_meta_write_fault("journal")) {
2319 struct journal_buf *w = journal_prev_buf(j);
2320 unsigned long flags;
2322 spin_lock_irqsave(&j->err_lock, flags);
2323 bch2_extent_drop_device(bkey_i_to_s_extent(&w->key), ca->dev_idx);
2324 spin_unlock_irqrestore(&j->err_lock, flags);
2327 closure_put(&j->io);
2328 percpu_ref_put(&ca->io_ref);
2331 static void journal_write(struct closure *cl)
2333 struct journal *j = container_of(cl, struct journal, io);
2334 struct bch_fs *c = container_of(j, struct bch_fs, journal);
2336 struct journal_buf *w = journal_prev_buf(j);
2339 struct bch_extent_ptr *ptr;
2340 unsigned i, sectors, bytes;
2342 journal_buf_realloc(j, w);
2345 j->write_start_time = local_clock();
2346 mutex_lock(&c->btree_root_lock);
2347 for (i = 0; i < BTREE_ID_NR; i++) {
2348 struct btree_root *r = &c->btree_roots[i];
2351 bch2_journal_add_btree_root(w, i, &r->key, r->level);
2353 c->btree_roots_dirty = false;
2354 mutex_unlock(&c->btree_root_lock);
2356 journal_write_compact(jset);
2358 jset->read_clock = cpu_to_le16(c->bucket_clock[READ].hand);
2359 jset->write_clock = cpu_to_le16(c->bucket_clock[WRITE].hand);
2360 jset->magic = cpu_to_le64(jset_magic(c));
2361 jset->version = cpu_to_le32(BCACHE_JSET_VERSION);
2363 SET_JSET_BIG_ENDIAN(jset, CPU_BIG_ENDIAN);
2364 SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
2366 if (bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
2367 journal_entry_validate_entries(c, jset, WRITE))
2370 bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
2371 jset->encrypted_start,
2372 vstruct_end(jset) - (void *) jset->encrypted_start);
2374 jset->csum = csum_vstruct(c, JSET_CSUM_TYPE(jset),
2375 journal_nonce(jset), jset);
2377 if (!bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
2378 journal_entry_validate_entries(c, jset, WRITE))
2381 sectors = vstruct_sectors(jset, c->block_bits);
2382 BUG_ON(sectors > j->prev_buf_sectors);
2384 bytes = vstruct_bytes(w->data);
2385 memset((void *) w->data + bytes, 0, (sectors << 9) - bytes);
2387 if (journal_write_alloc(j, w, sectors)) {
2388 bch2_journal_halt(j);
2389 bch_err(c, "Unable to allocate journal write");
2390 bch2_fatal_error(c);
2391 continue_at(cl, journal_write_done, system_highpri_wq);
2395 * XXX: we really should just disable the entire journal in nochanges
2398 if (c->opts.nochanges)
2401 extent_for_each_ptr(bkey_i_to_s_extent(&w->key), ptr) {
2402 ca = bch_dev_bkey_exists(c, ptr->dev);
2403 if (!percpu_ref_tryget(&ca->io_ref)) {
2405 bch_err(c, "missing device for journal write\n");
2409 this_cpu_add(ca->io_done->sectors[WRITE][BCH_DATA_JOURNAL],
2412 bio = ca->journal.bio;
2414 bio_set_dev(bio, ca->disk_sb.bdev);
2415 bio->bi_iter.bi_sector = ptr->offset;
2416 bio->bi_iter.bi_size = sectors << 9;
2417 bio->bi_end_io = journal_write_endio;
2418 bio->bi_private = ca;
2419 bio_set_op_attrs(bio, REQ_OP_WRITE,
2420 REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
2421 bch2_bio_map(bio, jset);
2423 trace_journal_write(bio);
2424 closure_bio_submit(bio, cl);
2426 ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(w->data->seq);
2429 for_each_rw_member(ca, c, i)
2430 if (journal_flushes_device(ca) &&
2431 !bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), i)) {
2432 percpu_ref_get(&ca->io_ref);
2434 bio = ca->journal.bio;
2436 bio_set_dev(bio, ca->disk_sb.bdev);
2437 bio->bi_opf = REQ_OP_FLUSH;
2438 bio->bi_end_io = journal_write_endio;
2439 bio->bi_private = ca;
2440 closure_bio_submit(bio, cl);
2444 extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr)
2445 ptr->offset += sectors;
2447 continue_at(cl, journal_write_done, system_highpri_wq);
2449 bch2_inconsistent_error(c);
2450 continue_at(cl, journal_write_done, system_highpri_wq);
2454 * returns true if there's nothing to flush and no journal write still in flight
2456 static bool journal_flush_write(struct journal *j)
2460 spin_lock(&j->lock);
2461 ret = !j->reservations.prev_buf_unwritten;
2463 if (!journal_entry_is_open(j)) {
2464 spin_unlock(&j->lock);
2468 set_bit(JOURNAL_NEED_WRITE, &j->flags);
2469 if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
2472 spin_unlock(&j->lock);
2476 static void journal_write_work(struct work_struct *work)
2478 struct journal *j = container_of(work, struct journal, write_work.work);
2480 journal_flush_write(j);
2484 * Given an inode number, if that inode number has data in the journal that
2485 * hasn't yet been flushed, return the journal sequence number that needs to be
2488 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
2490 size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
2493 if (!test_bit(h, j->buf[0].has_inode) &&
2494 !test_bit(h, j->buf[1].has_inode))
2497 spin_lock(&j->lock);
2498 if (test_bit(h, journal_cur_buf(j)->has_inode))
2499 seq = journal_cur_seq(j);
2500 else if (test_bit(h, journal_prev_buf(j)->has_inode))
2501 seq = journal_cur_seq(j) - 1;
2502 spin_unlock(&j->lock);
2507 static int __journal_res_get(struct journal *j, struct journal_res *res,
2508 unsigned u64s_min, unsigned u64s_max)
2510 struct bch_fs *c = container_of(j, struct bch_fs, journal);
2511 struct journal_buf *buf;
2514 ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2518 spin_lock(&j->lock);
2520 * Recheck after taking the lock, so we don't race with another thread
2521 * that just did journal_entry_open() and call journal_entry_close()
2524 ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2526 spin_unlock(&j->lock);
2531 * If we couldn't get a reservation because the current buf filled up,
2532 * and we had room for a bigger entry on disk, signal that we want to
2533 * realloc the journal bufs:
2535 buf = journal_cur_buf(j);
2536 if (journal_entry_is_open(j) &&
2537 buf->size >> 9 < buf->disk_sectors &&
2538 buf->size < JOURNAL_ENTRY_SIZE_MAX)
2539 j->buf_size_want = max(j->buf_size_want, buf->size << 1);
2542 * Close the current journal entry if necessary, then try to start a new
2545 switch (journal_buf_switch(j, false)) {
2546 case JOURNAL_ENTRY_ERROR:
2547 spin_unlock(&j->lock);
2549 case JOURNAL_ENTRY_INUSE:
2550 /* haven't finished writing out the previous one: */
2551 spin_unlock(&j->lock);
2552 trace_journal_entry_full(c);
2554 case JOURNAL_ENTRY_CLOSED:
2556 case JOURNAL_UNLOCKED:
2560 /* We now have a new, closed journal buf - see if we can open it: */
2561 ret = journal_entry_open(j);
2562 spin_unlock(&j->lock);
2569 /* Journal's full, we have to wait */
2572 * Direct reclaim - can't rely on reclaim from work item
2575 journal_reclaim_work(&j->reclaim_work.work);
2577 trace_journal_full(c);
2579 if (!j->res_get_blocked_start)
2580 j->res_get_blocked_start = local_clock() ?: 1;
2585 * Essentially the entry function to the journaling code. When bcachefs is doing
2586 * a btree insert, it calls this function to get the current journal write.
2587 * Journal write is the structure used set up journal writes. The calling
2588 * function will then add its keys to the structure, queuing them for the next
2591 * To ensure forward progress, the current task must not be holding any
2592 * btree node write locks.
2594 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
2595 unsigned u64s_min, unsigned u64s_max)
2600 (ret = __journal_res_get(j, res, u64s_min,
2602 return ret < 0 ? ret : 0;
2605 u64 bch2_journal_last_unwritten_seq(struct journal *j)
2609 spin_lock(&j->lock);
2610 seq = journal_cur_seq(j);
2611 if (j->reservations.prev_buf_unwritten)
2613 spin_unlock(&j->lock);
2618 int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
2622 spin_lock(&j->lock);
2623 BUG_ON(seq > journal_cur_seq(j));
2625 if (seq < journal_cur_seq(j) ||
2626 journal_entry_is_open(j)) {
2627 spin_unlock(&j->lock);
2631 ret = journal_entry_open(j);
2633 closure_wait(&j->async_wait, parent);
2634 spin_unlock(&j->lock);
2637 journal_reclaim_work(&j->reclaim_work.work);
2642 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
2644 spin_lock(&j->lock);
2646 BUG_ON(seq > journal_cur_seq(j));
2648 if (bch2_journal_error(j)) {
2649 spin_unlock(&j->lock);
2653 if (seq == journal_cur_seq(j)) {
2654 if (!closure_wait(&journal_cur_buf(j)->wait, parent))
2656 } else if (seq + 1 == journal_cur_seq(j) &&
2657 j->reservations.prev_buf_unwritten) {
2658 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
2663 /* check if raced with write completion (or failure) */
2664 if (!j->reservations.prev_buf_unwritten ||
2665 bch2_journal_error(j))
2666 closure_wake_up(&journal_prev_buf(j)->wait);
2669 spin_unlock(&j->lock);
2672 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
2674 struct journal_buf *buf;
2676 spin_lock(&j->lock);
2678 BUG_ON(seq > journal_cur_seq(j));
2680 if (bch2_journal_error(j)) {
2681 spin_unlock(&j->lock);
2685 if (seq == journal_cur_seq(j)) {
2686 bool set_need_write = false;
2688 buf = journal_cur_buf(j);
2690 if (parent && !closure_wait(&buf->wait, parent))
2693 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2694 j->need_write_time = local_clock();
2695 set_need_write = true;
2698 switch (journal_buf_switch(j, set_need_write)) {
2699 case JOURNAL_ENTRY_ERROR:
2701 closure_wake_up(&buf->wait);
2703 case JOURNAL_ENTRY_CLOSED:
2705 * Journal entry hasn't been opened yet, but caller
2706 * claims it has something
2709 case JOURNAL_ENTRY_INUSE:
2711 case JOURNAL_UNLOCKED:
2714 } else if (parent &&
2715 seq + 1 == journal_cur_seq(j) &&
2716 j->reservations.prev_buf_unwritten) {
2717 buf = journal_prev_buf(j);
2719 if (!closure_wait(&buf->wait, parent))
2724 /* check if raced with write completion (or failure) */
2725 if (!j->reservations.prev_buf_unwritten ||
2726 bch2_journal_error(j))
2727 closure_wake_up(&buf->wait);
2730 spin_unlock(&j->lock);
2733 static int journal_seq_flushed(struct journal *j, u64 seq)
2735 struct journal_buf *buf;
2738 spin_lock(&j->lock);
2739 BUG_ON(seq > journal_cur_seq(j));
2741 if (seq == journal_cur_seq(j)) {
2742 bool set_need_write = false;
2746 buf = journal_cur_buf(j);
2748 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2749 j->need_write_time = local_clock();
2750 set_need_write = true;
2753 switch (journal_buf_switch(j, set_need_write)) {
2754 case JOURNAL_ENTRY_ERROR:
2757 case JOURNAL_ENTRY_CLOSED:
2759 * Journal entry hasn't been opened yet, but caller
2760 * claims it has something
2763 case JOURNAL_ENTRY_INUSE:
2765 case JOURNAL_UNLOCKED:
2768 } else if (seq + 1 == journal_cur_seq(j) &&
2769 j->reservations.prev_buf_unwritten) {
2770 ret = bch2_journal_error(j);
2773 spin_unlock(&j->lock);
2778 int bch2_journal_flush_seq(struct journal *j, u64 seq)
2780 u64 start_time = local_clock();
2783 ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
2785 bch2_time_stats_update(j->flush_seq_time, start_time);
2787 return ret ?: ret2 < 0 ? ret2 : 0;
2790 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
2792 struct journal_res res;
2793 unsigned u64s = jset_u64s(0);
2795 memset(&res, 0, sizeof(res));
2797 bch2_journal_res_get(j, &res, u64s, u64s);
2798 bch2_journal_res_put(j, &res);
2800 bch2_journal_flush_seq_async(j, res.seq, parent);
2803 int bch2_journal_meta(struct journal *j)
2805 struct journal_res res;
2806 unsigned u64s = jset_u64s(0);
2809 memset(&res, 0, sizeof(res));
2811 ret = bch2_journal_res_get(j, &res, u64s, u64s);
2815 bch2_journal_res_put(j, &res);
2817 return bch2_journal_flush_seq(j, res.seq);
2820 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
2822 u64 seq, journal_seq;
2824 spin_lock(&j->lock);
2825 journal_seq = journal_cur_seq(j);
2827 if (journal_entry_is_open(j)) {
2829 } else if (journal_seq) {
2830 seq = journal_seq - 1;
2832 spin_unlock(&j->lock);
2835 spin_unlock(&j->lock);
2837 bch2_journal_flush_seq_async(j, seq, parent);
2840 int bch2_journal_flush(struct journal *j)
2842 u64 seq, journal_seq;
2844 spin_lock(&j->lock);
2845 journal_seq = journal_cur_seq(j);
2847 if (journal_entry_is_open(j)) {
2849 } else if (journal_seq) {
2850 seq = journal_seq - 1;
2852 spin_unlock(&j->lock);
2855 spin_unlock(&j->lock);
2857 return bch2_journal_flush_seq(j, seq);
2860 int bch2_journal_flush_device(struct journal *j, int dev_idx)
2862 struct bch_fs *c = container_of(j, struct bch_fs, journal);
2863 struct journal_entry_pin_list *p;
2864 struct bch_devs_list devs;
2868 spin_lock(&j->lock);
2869 fifo_for_each_entry_ptr(p, &j->pin, iter)
2871 ? bch2_dev_list_has_dev(p->devs, dev_idx)
2872 : p->devs.nr < c->opts.metadata_replicas)
2874 spin_unlock(&j->lock);
2876 ret = bch2_journal_flush_pins(j, seq);
2880 mutex_lock(&c->replicas_gc_lock);
2881 bch2_replicas_gc_start(c, 1 << BCH_DATA_JOURNAL);
2885 spin_lock(&j->lock);
2886 while (!ret && seq < j->pin.back) {
2887 seq = max(seq, journal_last_seq(j));
2888 devs = journal_seq_pin(j, seq)->devs;
2891 spin_unlock(&j->lock);
2892 ret = bch2_mark_replicas(c, BCH_DATA_JOURNAL, devs);
2893 spin_lock(&j->lock);
2895 spin_unlock(&j->lock);
2897 bch2_replicas_gc_end(c, ret);
2898 mutex_unlock(&c->replicas_gc_lock);
2903 /* startup/shutdown: */
2905 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
2907 union journal_res_state state;
2908 struct journal_buf *w;
2911 spin_lock(&j->lock);
2912 state = READ_ONCE(j->reservations);
2913 w = j->buf + !state.idx;
2915 ret = state.prev_buf_unwritten &&
2916 bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx);
2917 spin_unlock(&j->lock);
2922 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
2924 spin_lock(&j->lock);
2925 bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
2926 spin_unlock(&j->lock);
2928 wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
2931 void bch2_fs_journal_stop(struct journal *j)
2933 wait_event(j->wait, journal_flush_write(j));
2935 cancel_delayed_work_sync(&j->write_work);
2936 cancel_delayed_work_sync(&j->reclaim_work);
2939 void bch2_dev_journal_exit(struct bch_dev *ca)
2941 kfree(ca->journal.bio);
2942 kfree(ca->journal.buckets);
2943 kfree(ca->journal.bucket_seq);
2945 ca->journal.bio = NULL;
2946 ca->journal.buckets = NULL;
2947 ca->journal.bucket_seq = NULL;
2950 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
2952 struct journal_device *ja = &ca->journal;
2953 struct bch_sb_field_journal *journal_buckets =
2954 bch2_sb_get_journal(sb);
2957 ja->nr = bch2_nr_journal_buckets(journal_buckets);
2959 ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2960 if (!ja->bucket_seq)
2963 ca->journal.bio = bio_kmalloc(GFP_KERNEL,
2964 DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
2965 if (!ca->journal.bio)
2968 ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2972 for (i = 0; i < ja->nr; i++)
2973 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
2978 void bch2_fs_journal_exit(struct journal *j)
2980 kvpfree(j->buf[1].data, j->buf[1].size);
2981 kvpfree(j->buf[0].data, j->buf[0].size);
2985 int bch2_fs_journal_init(struct journal *j)
2987 struct bch_fs *c = container_of(j, struct bch_fs, journal);
2988 static struct lock_class_key res_key;
2991 pr_verbose_init(c->opts, "");
2993 spin_lock_init(&j->lock);
2994 spin_lock_init(&j->err_lock);
2995 init_waitqueue_head(&j->wait);
2996 INIT_DELAYED_WORK(&j->write_work, journal_write_work);
2997 INIT_DELAYED_WORK(&j->reclaim_work, journal_reclaim_work);
2998 mutex_init(&j->blacklist_lock);
2999 INIT_LIST_HEAD(&j->seq_blacklist);
3000 mutex_init(&j->reclaim_lock);
3002 lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
3004 j->buf[0].size = JOURNAL_ENTRY_SIZE_MIN;
3005 j->buf[1].size = JOURNAL_ENTRY_SIZE_MIN;
3006 j->write_delay_ms = 1000;
3007 j->reclaim_delay_ms = 100;
3009 bkey_extent_init(&j->key);
3011 atomic64_set(&j->reservations.counter,
3012 ((union journal_res_state)
3013 { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
3015 if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
3016 !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
3017 !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) {
3022 j->pin.front = j->pin.back = 1;
3024 pr_verbose_init(c->opts, "ret %i", ret);
3030 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
3032 struct bch_fs *c = container_of(j, struct bch_fs, journal);
3033 union journal_res_state *s = &j->reservations;
3039 spin_lock(&j->lock);
3041 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3042 "active journal entries:\t%llu\n"
3044 "last_seq:\t\t%llu\n"
3045 "last_seq_ondisk:\t%llu\n"
3046 "reservation count:\t%u\n"
3047 "reservation offset:\t%u\n"
3048 "current entry u64s:\t%u\n"
3049 "io in flight:\t\t%i\n"
3050 "need write:\t\t%i\n"
3052 "replay done:\t\t%i\n",
3055 journal_last_seq(j),
3057 journal_state_count(*s, s->idx),
3058 s->cur_entry_offset,
3060 s->prev_buf_unwritten,
3061 test_bit(JOURNAL_NEED_WRITE, &j->flags),
3062 journal_entry_is_open(j),
3063 test_bit(JOURNAL_REPLAY_DONE, &j->flags));
3065 for_each_member_device_rcu(ca, c, iter,
3066 &c->rw_devs[BCH_DATA_JOURNAL]) {
3067 struct journal_device *ja = &ca->journal;
3072 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3075 "\tcur_idx\t\t%u (seq %llu)\n"
3076 "\tlast_idx\t%u (seq %llu)\n",
3078 ja->cur_idx, ja->bucket_seq[ja->cur_idx],
3079 ja->last_idx, ja->bucket_seq[ja->last_idx]);
3082 spin_unlock(&j->lock);
3088 ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
3090 struct journal_entry_pin_list *pin_list;
3091 struct journal_entry_pin *pin;
3095 spin_lock(&j->lock);
3096 fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
3097 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3099 i, atomic_read(&pin_list->count));
3101 list_for_each_entry(pin, &pin_list->list, list)
3102 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3106 if (!list_empty(&pin_list->flushed))
3107 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3110 list_for_each_entry(pin, &pin_list->flushed, list)
3111 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3115 spin_unlock(&j->lock);