]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
Update bcachefs sources to e82e656279 bcachefs: Cleanups for building in userspace
[bcachefs-tools-debian] / libbcachefs / journal.c
1 /*
2  * bcachefs journalling code, for btree insertions
3  *
4  * Copyright 2012 Google, Inc.
5  */
6
7 #include "bcachefs.h"
8 #include "alloc.h"
9 #include "bkey_methods.h"
10 #include "buckets.h"
11 #include "btree_gc.h"
12 #include "btree_update.h"
13 #include "btree_update_interior.h"
14 #include "btree_io.h"
15 #include "checksum.h"
16 #include "debug.h"
17 #include "error.h"
18 #include "extents.h"
19 #include "io.h"
20 #include "keylist.h"
21 #include "journal.h"
22 #include "super-io.h"
23 #include "vstructs.h"
24
25 #include <trace/events/bcachefs.h>
26
27 static void journal_write(struct closure *);
28 static void journal_reclaim_fast(struct journal *);
29 static void journal_pin_add_entry(struct journal *,
30                                   struct journal_entry_pin_list *,
31                                   struct journal_entry_pin *,
32                                   journal_pin_flush_fn);
33
34 static inline struct journal_buf *journal_cur_buf(struct journal *j)
35 {
36         return j->buf + j->reservations.idx;
37 }
38
39 static inline struct journal_buf *journal_prev_buf(struct journal *j)
40 {
41         return j->buf + !j->reservations.idx;
42 }
43
44 /* Sequence number of oldest dirty journal entry */
45
46 static inline u64 last_seq(struct journal *j)
47 {
48         return atomic64_read(&j->seq) - fifo_used(&j->pin) + 1;
49 }
50
51 static inline u64 journal_pin_seq(struct journal *j,
52                                   struct journal_entry_pin_list *pin_list)
53 {
54         return last_seq(j) + fifo_entry_idx(&j->pin, pin_list);
55 }
56
57 static inline void bch2_journal_add_entry_noreservation(struct journal_buf *buf,
58                                  unsigned type, enum btree_id id,
59                                  unsigned level,
60                                  const void *data, size_t u64s)
61 {
62         struct jset *jset = buf->data;
63
64         bch2_journal_add_entry_at(buf, le32_to_cpu(jset->u64s),
65                                   type, id, level, data, u64s);
66         le32_add_cpu(&jset->u64s, jset_u64s(u64s));
67 }
68
69 static struct jset_entry *bch2_journal_find_entry(struct jset *j, unsigned type,
70                                                  enum btree_id id)
71 {
72         struct jset_entry *entry;
73
74         for_each_jset_entry_type(entry, j, type)
75                 if (entry->btree_id == id)
76                         return entry;
77
78         return NULL;
79 }
80
81 struct bkey_i *bch2_journal_find_btree_root(struct bch_fs *c, struct jset *j,
82                                            enum btree_id id, unsigned *level)
83 {
84         struct bkey_i *k;
85         struct jset_entry *entry =
86                 bch2_journal_find_entry(j, JOURNAL_ENTRY_BTREE_ROOT, id);
87
88         if (!entry)
89                 return NULL;
90
91         k = entry->start;
92         *level = entry->level;
93         *level = entry->level;
94         return k;
95 }
96
97 static void bch2_journal_add_btree_root(struct journal_buf *buf,
98                                        enum btree_id id, struct bkey_i *k,
99                                        unsigned level)
100 {
101         bch2_journal_add_entry_noreservation(buf,
102                               JOURNAL_ENTRY_BTREE_ROOT, id, level,
103                               k, k->k.u64s);
104 }
105
106 static void journal_seq_blacklist_flush(struct journal *j,
107                                 struct journal_entry_pin *pin, u64 seq)
108 {
109         struct bch_fs *c =
110                 container_of(j, struct bch_fs, journal);
111         struct journal_seq_blacklist *bl =
112                 container_of(pin, struct journal_seq_blacklist, pin);
113         struct blacklisted_node n;
114         struct closure cl;
115         unsigned i;
116         int ret;
117
118         closure_init_stack(&cl);
119
120         for (i = 0;; i++) {
121                 struct btree_iter iter;
122                 struct btree *b;
123
124                 mutex_lock(&j->blacklist_lock);
125                 if (i >= bl->nr_entries) {
126                         mutex_unlock(&j->blacklist_lock);
127                         break;
128                 }
129                 n = bl->entries[i];
130                 mutex_unlock(&j->blacklist_lock);
131
132                 __bch2_btree_iter_init(&iter, c, n.btree_id, n.pos, 0, 0, 0);
133
134                 b = bch2_btree_iter_peek_node(&iter);
135
136                 /* The node might have already been rewritten: */
137
138                 if (b->data->keys.seq == n.seq) {
139                         ret = bch2_btree_node_rewrite(c, &iter, n.seq, 0);
140                         if (ret) {
141                                 bch2_btree_iter_unlock(&iter);
142                                 bch2_fs_fatal_error(c,
143                                         "error %i rewriting btree node with blacklisted journal seq",
144                                         ret);
145                                 bch2_journal_halt(j);
146                                 return;
147                         }
148                 }
149
150                 bch2_btree_iter_unlock(&iter);
151         }
152
153         for (i = 0;; i++) {
154                 struct btree_update *as;
155                 struct pending_btree_node_free *d;
156
157                 mutex_lock(&j->blacklist_lock);
158                 if (i >= bl->nr_entries) {
159                         mutex_unlock(&j->blacklist_lock);
160                         break;
161                 }
162                 n = bl->entries[i];
163                 mutex_unlock(&j->blacklist_lock);
164 redo_wait:
165                 mutex_lock(&c->btree_interior_update_lock);
166
167                 /*
168                  * Is the node on the list of pending interior node updates -
169                  * being freed? If so, wait for that to finish:
170                  */
171                 for_each_pending_btree_node_free(c, as, d)
172                         if (n.seq       == d->seq &&
173                             n.btree_id  == d->btree_id &&
174                             !d->level &&
175                             !bkey_cmp(n.pos, d->key.k.p)) {
176                                 closure_wait(&as->wait, &cl);
177                                 mutex_unlock(&c->btree_interior_update_lock);
178                                 closure_sync(&cl);
179                                 goto redo_wait;
180                         }
181
182                 mutex_unlock(&c->btree_interior_update_lock);
183         }
184
185         mutex_lock(&j->blacklist_lock);
186
187         bch2_journal_pin_drop(j, &bl->pin);
188         list_del(&bl->list);
189         kfree(bl->entries);
190         kfree(bl);
191
192         mutex_unlock(&j->blacklist_lock);
193 }
194
195 static struct journal_seq_blacklist *
196 journal_seq_blacklist_find(struct journal *j, u64 seq)
197 {
198         struct journal_seq_blacklist *bl;
199
200         lockdep_assert_held(&j->blacklist_lock);
201
202         list_for_each_entry(bl, &j->seq_blacklist, list)
203                 if (seq == bl->seq)
204                         return bl;
205
206         return NULL;
207 }
208
209 static struct journal_seq_blacklist *
210 bch2_journal_seq_blacklisted_new(struct journal *j, u64 seq)
211 {
212         struct journal_seq_blacklist *bl;
213
214         lockdep_assert_held(&j->blacklist_lock);
215
216         /*
217          * When we start the journal, bch2_journal_start() will skip over @seq:
218          */
219
220         bl = kzalloc(sizeof(*bl), GFP_KERNEL);
221         if (!bl)
222                 return NULL;
223
224         bl->seq = seq;
225         list_add_tail(&bl->list, &j->seq_blacklist);
226         return bl;
227 }
228
229 /*
230  * Returns true if @seq is newer than the most recent journal entry that got
231  * written, and data corresponding to @seq should be ignored - also marks @seq
232  * as blacklisted so that on future restarts the corresponding data will still
233  * be ignored:
234  */
235 int bch2_journal_seq_should_ignore(struct bch_fs *c, u64 seq, struct btree *b)
236 {
237         struct journal *j = &c->journal;
238         struct journal_seq_blacklist *bl = NULL;
239         struct blacklisted_node *n;
240         u64 journal_seq, i;
241         int ret = 0;
242
243         if (!seq)
244                 return 0;
245
246         journal_seq = atomic64_read(&j->seq);
247
248         /* Interier updates aren't journalled: */
249         BUG_ON(b->level);
250         BUG_ON(seq > journal_seq && test_bit(BCH_FS_INITIAL_GC_DONE, &c->flags));
251
252         /*
253          * Decrease this back to j->seq + 2 when we next rev the on disk format:
254          * increasing it temporarily to work around bug in old kernels
255          */
256         bch2_fs_inconsistent_on(seq > journal_seq + 4, c,
257                          "bset journal seq too far in the future: %llu > %llu",
258                          seq, journal_seq);
259
260         if (seq <= journal_seq &&
261             list_empty_careful(&j->seq_blacklist))
262                 return 0;
263
264         mutex_lock(&j->blacklist_lock);
265
266         if (seq <= journal_seq) {
267                 bl = journal_seq_blacklist_find(j, seq);
268                 if (!bl)
269                         goto out;
270         } else {
271                 bch_verbose(c, "btree node %u:%llu:%llu has future journal sequence number %llu, blacklisting",
272                             b->btree_id, b->key.k.p.inode, b->key.k.p.offset, seq);
273
274                 for (i = journal_seq + 1; i <= seq; i++) {
275                         bl = journal_seq_blacklist_find(j, i) ?:
276                                 bch2_journal_seq_blacklisted_new(j, i);
277                         if (!bl) {
278                                 ret = -ENOMEM;
279                                 goto out;
280                         }
281                 }
282         }
283
284         for (n = bl->entries; n < bl->entries + bl->nr_entries; n++)
285                 if (b->data->keys.seq   == n->seq &&
286                     b->btree_id         == n->btree_id &&
287                     !bkey_cmp(b->key.k.p, n->pos))
288                         goto found_entry;
289
290         if (!bl->nr_entries ||
291             is_power_of_2(bl->nr_entries)) {
292                 n = krealloc(bl->entries,
293                              max(bl->nr_entries * 2, 8UL) * sizeof(*n),
294                              GFP_KERNEL);
295                 if (!n) {
296                         ret = -ENOMEM;
297                         goto out;
298                 }
299                 bl->entries = n;
300         }
301
302         bl->entries[bl->nr_entries++] = (struct blacklisted_node) {
303                 .seq            = b->data->keys.seq,
304                 .btree_id       = b->btree_id,
305                 .pos            = b->key.k.p,
306         };
307 found_entry:
308         ret = 1;
309 out:
310         mutex_unlock(&j->blacklist_lock);
311         return ret;
312 }
313
314 /*
315  * Journal replay/recovery:
316  *
317  * This code is all driven from bch2_fs_start(); we first read the journal
318  * entries, do some other stuff, then we mark all the keys in the journal
319  * entries (same as garbage collection would), then we replay them - reinserting
320  * them into the cache in precisely the same order as they appear in the
321  * journal.
322  *
323  * We only journal keys that go in leaf nodes, which simplifies things quite a
324  * bit.
325  */
326
327 struct journal_list {
328         struct closure          cl;
329         struct mutex            lock;
330         struct list_head        *head;
331         int                     ret;
332 };
333
334 #define JOURNAL_ENTRY_ADD_OK            0
335 #define JOURNAL_ENTRY_ADD_OUT_OF_RANGE  5
336
337 /*
338  * Given a journal entry we just read, add it to the list of journal entries to
339  * be replayed:
340  */
341 static int journal_entry_add(struct bch_fs *c, struct journal_list *jlist,
342                     struct jset *j)
343 {
344         struct journal_replay *i, *pos;
345         struct list_head *where;
346         size_t bytes = vstruct_bytes(j);
347         __le64 last_seq;
348         int ret;
349
350         mutex_lock(&jlist->lock);
351
352         last_seq = !list_empty(jlist->head)
353                 ? list_last_entry(jlist->head, struct journal_replay,
354                                   list)->j.last_seq
355                 : 0;
356
357         /* Is this entry older than the range we need? */
358         if (le64_to_cpu(j->seq) < le64_to_cpu(last_seq)) {
359                 ret = JOURNAL_ENTRY_ADD_OUT_OF_RANGE;
360                 goto out;
361         }
362
363         /* Drop entries we don't need anymore */
364         list_for_each_entry_safe(i, pos, jlist->head, list) {
365                 if (le64_to_cpu(i->j.seq) >= le64_to_cpu(j->last_seq))
366                         break;
367                 list_del(&i->list);
368                 kvpfree(i, offsetof(struct journal_replay, j) +
369                         vstruct_bytes(&i->j));
370         }
371
372         list_for_each_entry_reverse(i, jlist->head, list) {
373                 /* Duplicate? */
374                 if (le64_to_cpu(j->seq) == le64_to_cpu(i->j.seq)) {
375                         fsck_err_on(bytes != vstruct_bytes(&i->j) ||
376                                     memcmp(j, &i->j, bytes), c,
377                                     "found duplicate but non identical journal entries (seq %llu)",
378                                     le64_to_cpu(j->seq));
379
380                         ret = JOURNAL_ENTRY_ADD_OK;
381                         goto out;
382                 }
383
384                 if (le64_to_cpu(j->seq) > le64_to_cpu(i->j.seq)) {
385                         where = &i->list;
386                         goto add;
387                 }
388         }
389
390         where = jlist->head;
391 add:
392         i = kvpmalloc(offsetof(struct journal_replay, j) + bytes, GFP_KERNEL);
393         if (!i) {
394                 ret = -ENOMEM;
395                 goto out;
396         }
397
398         memcpy(&i->j, j, bytes);
399         list_add(&i->list, where);
400         ret = JOURNAL_ENTRY_ADD_OK;
401 out:
402 fsck_err:
403         mutex_unlock(&jlist->lock);
404         return ret;
405 }
406
407 static struct nonce journal_nonce(const struct jset *jset)
408 {
409         return (struct nonce) {{
410                 [0] = 0,
411                 [1] = ((__le32 *) &jset->seq)[0],
412                 [2] = ((__le32 *) &jset->seq)[1],
413                 [3] = BCH_NONCE_JOURNAL,
414         }};
415 }
416
417 static void journal_entry_null_range(void *start, void *end)
418 {
419         struct jset_entry *entry;
420
421         for (entry = start; entry != end; entry = vstruct_next(entry))
422                 memset(entry, 0, sizeof(*entry));
423 }
424
425 static int journal_validate_key(struct bch_fs *c, struct jset *j,
426                                 struct jset_entry *entry,
427                                 struct bkey_i *k, enum bkey_type key_type,
428                                 const char *type)
429 {
430         void *next = vstruct_next(entry);
431         const char *invalid;
432         char buf[160];
433         int ret = 0;
434
435         if (mustfix_fsck_err_on(!k->k.u64s, c,
436                         "invalid %s in journal: k->u64s 0", type)) {
437                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
438                 journal_entry_null_range(vstruct_next(entry), next);
439                 return 0;
440         }
441
442         if (mustfix_fsck_err_on((void *) bkey_next(k) >
443                                 (void *) vstruct_next(entry), c,
444                         "invalid %s in journal: extends past end of journal entry",
445                         type)) {
446                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
447                 journal_entry_null_range(vstruct_next(entry), next);
448                 return 0;
449         }
450
451         if (mustfix_fsck_err_on(k->k.format != KEY_FORMAT_CURRENT, c,
452                         "invalid %s in journal: bad format %u",
453                         type, k->k.format)) {
454                 le16_add_cpu(&entry->u64s, -k->k.u64s);
455                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
456                 journal_entry_null_range(vstruct_next(entry), next);
457                 return 0;
458         }
459
460         if (JSET_BIG_ENDIAN(j) != CPU_BIG_ENDIAN)
461                 bch2_bkey_swab(key_type, NULL, bkey_to_packed(k));
462
463         invalid = bch2_bkey_invalid(c, key_type, bkey_i_to_s_c(k));
464         if (invalid) {
465                 bch2_bkey_val_to_text(c, key_type, buf, sizeof(buf),
466                                      bkey_i_to_s_c(k));
467                 mustfix_fsck_err(c, "invalid %s in journal: %s", type, buf);
468
469                 le16_add_cpu(&entry->u64s, -k->k.u64s);
470                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
471                 journal_entry_null_range(vstruct_next(entry), next);
472                 return 0;
473         }
474 fsck_err:
475         return ret;
476 }
477
478 #define JOURNAL_ENTRY_REREAD    5
479 #define JOURNAL_ENTRY_NONE      6
480 #define JOURNAL_ENTRY_BAD       7
481
482 #define journal_entry_err(c, msg, ...)                                  \
483 ({                                                                      \
484         if (write == READ) {                                            \
485                 mustfix_fsck_err(c, msg, ##__VA_ARGS__);                \
486         } else {                                                        \
487                 bch_err(c, "detected corrupt metadata before write:\n"  \
488                         msg, ##__VA_ARGS__);                            \
489                 ret = BCH_FSCK_ERRORS_NOT_FIXED;                        \
490                 goto fsck_err;                                          \
491         }                                                               \
492         true;                                                           \
493 })
494
495 #define journal_entry_err_on(cond, c, msg, ...)                         \
496         ((cond) ? journal_entry_err(c, msg, ##__VA_ARGS__) : false)
497
498 static int __journal_entry_validate(struct bch_fs *c, struct jset *j,
499                                     int write)
500 {
501         struct jset_entry *entry;
502         int ret = 0;
503
504         vstruct_for_each(j, entry) {
505                 struct bkey_i *k;
506
507                 if (journal_entry_err_on(vstruct_next(entry) >
508                                          vstruct_last(j), c,
509                                 "journal entry extends past end of jset")) {
510                         j->u64s = cpu_to_le64((u64 *) entry - j->_data);
511                         break;
512                 }
513
514                 switch (entry->type) {
515                 case JOURNAL_ENTRY_BTREE_KEYS:
516                         vstruct_for_each(entry, k) {
517                                 ret = journal_validate_key(c, j, entry, k,
518                                                 bkey_type(entry->level,
519                                                           entry->btree_id),
520                                                 "key");
521                                 if (ret)
522                                         goto fsck_err;
523                         }
524                         break;
525
526                 case JOURNAL_ENTRY_BTREE_ROOT:
527                         k = entry->start;
528
529                         if (journal_entry_err_on(!entry->u64s ||
530                                         le16_to_cpu(entry->u64s) != k->k.u64s, c,
531                                         "invalid btree root journal entry: wrong number of keys")) {
532                                 journal_entry_null_range(entry,
533                                                 vstruct_next(entry));
534                                 continue;
535                         }
536
537                         ret = journal_validate_key(c, j, entry, k,
538                                                    BKEY_TYPE_BTREE, "btree root");
539                         if (ret)
540                                 goto fsck_err;
541                         break;
542
543                 case JOURNAL_ENTRY_PRIO_PTRS:
544                         break;
545
546                 case JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED:
547                         if (journal_entry_err_on(le16_to_cpu(entry->u64s) != 1, c,
548                                 "invalid journal seq blacklist entry: bad size")) {
549                                 journal_entry_null_range(entry,
550                                                 vstruct_next(entry));
551                         }
552
553                         break;
554                 default:
555                         journal_entry_err(c, "invalid journal entry type %u",
556                                           entry->type);
557                         journal_entry_null_range(entry, vstruct_next(entry));
558                         break;
559                 }
560         }
561
562 fsck_err:
563         return ret;
564 }
565
566 static int journal_entry_validate(struct bch_fs *c,
567                                   struct jset *j, u64 sector,
568                                   unsigned bucket_sectors_left,
569                                   unsigned sectors_read,
570                                   int write)
571 {
572         size_t bytes = vstruct_bytes(j);
573         struct bch_csum csum;
574         int ret = 0;
575
576         if (le64_to_cpu(j->magic) != jset_magic(c))
577                 return JOURNAL_ENTRY_NONE;
578
579         if (le32_to_cpu(j->version) != BCACHE_JSET_VERSION) {
580                 bch_err(c, "unknown journal entry version %u",
581                         le32_to_cpu(j->version));
582                 return BCH_FSCK_UNKNOWN_VERSION;
583         }
584
585         if (journal_entry_err_on(bytes > bucket_sectors_left << 9, c,
586                         "journal entry too big (%zu bytes), sector %lluu",
587                         bytes, sector)) {
588                 /* XXX: note we might have missing journal entries */
589                 return JOURNAL_ENTRY_BAD;
590         }
591
592         if (bytes > sectors_read << 9)
593                 return JOURNAL_ENTRY_REREAD;
594
595         if (fsck_err_on(!bch2_checksum_type_valid(c, JSET_CSUM_TYPE(j)), c,
596                         "journal entry with unknown csum type %llu sector %lluu",
597                         JSET_CSUM_TYPE(j), sector))
598                 return JOURNAL_ENTRY_BAD;
599
600         csum = csum_vstruct(c, JSET_CSUM_TYPE(j), journal_nonce(j), j);
601         if (journal_entry_err_on(bch2_crc_cmp(csum, j->csum), c,
602                         "journal checksum bad, sector %llu", sector)) {
603                 /* XXX: retry IO, when we start retrying checksum errors */
604                 /* XXX: note we might have missing journal entries */
605                 return JOURNAL_ENTRY_BAD;
606         }
607
608         bch2_encrypt(c, JSET_CSUM_TYPE(j), journal_nonce(j),
609                     j->encrypted_start,
610                     vstruct_end(j) - (void *) j->encrypted_start);
611
612         if (journal_entry_err_on(le64_to_cpu(j->last_seq) > le64_to_cpu(j->seq), c,
613                         "invalid journal entry: last_seq > seq"))
614                 j->last_seq = j->seq;
615
616         return __journal_entry_validate(c, j, write);
617 fsck_err:
618         return ret;
619 }
620
621 struct journal_read_buf {
622         void            *data;
623         size_t          size;
624 };
625
626 static int journal_read_buf_realloc(struct journal_read_buf *b,
627                                     size_t new_size)
628 {
629         void *n;
630
631         /* the bios are sized for this many pages, max: */
632         if (new_size > JOURNAL_ENTRY_SIZE_MAX)
633                 return -ENOMEM;
634
635         new_size = roundup_pow_of_two(new_size);
636         n = kvpmalloc(new_size, GFP_KERNEL);
637         if (!n)
638                 return -ENOMEM;
639
640         kvpfree(b->data, b->size);
641         b->data = n;
642         b->size = new_size;
643         return 0;
644 }
645
646 static int journal_read_bucket(struct bch_dev *ca,
647                                struct journal_read_buf *buf,
648                                struct journal_list *jlist,
649                                unsigned bucket, u64 *seq, bool *entries_found)
650 {
651         struct bch_fs *c = ca->fs;
652         struct journal_device *ja = &ca->journal;
653         struct bio *bio = ja->bio;
654         struct jset *j = NULL;
655         unsigned sectors, sectors_read = 0;
656         u64 offset = bucket_to_sector(ca, ja->buckets[bucket]),
657             end = offset + ca->mi.bucket_size;
658         bool saw_bad = false;
659         int ret = 0;
660
661         pr_debug("reading %u", bucket);
662
663         while (offset < end) {
664                 if (!sectors_read) {
665 reread:                 sectors_read = min_t(unsigned,
666                                 end - offset, buf->size >> 9);
667
668                         bio_reset(bio);
669                         bio->bi_bdev            = ca->disk_sb.bdev;
670                         bio->bi_iter.bi_sector  = offset;
671                         bio->bi_iter.bi_size    = sectors_read << 9;
672                         bio_set_op_attrs(bio, REQ_OP_READ, 0);
673                         bch2_bio_map(bio, buf->data);
674
675                         ret = submit_bio_wait(bio);
676
677                         if (bch2_dev_io_err_on(ret, ca,
678                                                "journal read from sector %llu",
679                                                offset) ||
680                             bch2_meta_read_fault("journal"))
681                                 return -EIO;
682
683                         j = buf->data;
684                 }
685
686                 ret = journal_entry_validate(c, j, offset,
687                                         end - offset, sectors_read,
688                                         READ);
689                 switch (ret) {
690                 case BCH_FSCK_OK:
691                         break;
692                 case JOURNAL_ENTRY_REREAD:
693                         if (vstruct_bytes(j) > buf->size) {
694                                 ret = journal_read_buf_realloc(buf,
695                                                         vstruct_bytes(j));
696                                 if (ret)
697                                         return ret;
698                         }
699                         goto reread;
700                 case JOURNAL_ENTRY_NONE:
701                         if (!saw_bad)
702                                 return 0;
703                         sectors = c->sb.block_size;
704                         goto next_block;
705                 case JOURNAL_ENTRY_BAD:
706                         saw_bad = true;
707                         sectors = c->sb.block_size;
708                         goto next_block;
709                 default:
710                         return ret;
711                 }
712
713                 /*
714                  * This happens sometimes if we don't have discards on -
715                  * when we've partially overwritten a bucket with new
716                  * journal entries. We don't need the rest of the
717                  * bucket:
718                  */
719                 if (le64_to_cpu(j->seq) < ja->bucket_seq[bucket])
720                         return 0;
721
722                 ja->bucket_seq[bucket] = le64_to_cpu(j->seq);
723
724                 ret = journal_entry_add(c, jlist, j);
725                 switch (ret) {
726                 case JOURNAL_ENTRY_ADD_OK:
727                         *entries_found = true;
728                         break;
729                 case JOURNAL_ENTRY_ADD_OUT_OF_RANGE:
730                         break;
731                 default:
732                         return ret;
733                 }
734
735                 if (le64_to_cpu(j->seq) > *seq)
736                         *seq = le64_to_cpu(j->seq);
737
738                 sectors = vstruct_sectors(j, c->block_bits);
739 next_block:
740                 pr_debug("next");
741                 offset          += sectors;
742                 sectors_read    -= sectors;
743                 j = ((void *) j) + (sectors << 9);
744         }
745
746         return 0;
747 }
748
749 static void bch2_journal_read_device(struct closure *cl)
750 {
751 #define read_bucket(b)                                                  \
752         ({                                                              \
753                 bool entries_found = false;                             \
754                 ret = journal_read_bucket(ca, &buf, jlist, b, &seq,     \
755                                           &entries_found);              \
756                 if (ret)                                                \
757                         goto err;                                       \
758                 __set_bit(b, bitmap);                                   \
759                 entries_found;                                          \
760          })
761
762         struct journal_device *ja =
763                 container_of(cl, struct journal_device, read);
764         struct bch_dev *ca = container_of(ja, struct bch_dev, journal);
765         struct journal_list *jlist =
766                 container_of(cl->parent, struct journal_list, cl);
767         struct request_queue *q = bdev_get_queue(ca->disk_sb.bdev);
768         struct journal_read_buf buf = { NULL, 0 };
769
770         DECLARE_BITMAP(bitmap, ja->nr);
771         unsigned i, l, r;
772         u64 seq = 0;
773         int ret;
774
775         if (!ja->nr)
776                 goto out;
777
778         bitmap_zero(bitmap, ja->nr);
779         ret = journal_read_buf_realloc(&buf, PAGE_SIZE);
780         if (ret)
781                 goto err;
782
783         pr_debug("%u journal buckets", ja->nr);
784
785         /*
786          * If the device supports discard but not secure discard, we can't do
787          * the fancy fibonacci hash/binary search because the live journal
788          * entries might not form a contiguous range:
789          */
790         for (i = 0; i < ja->nr; i++)
791                 read_bucket(i);
792         goto search_done;
793
794         if (!blk_queue_nonrot(q))
795                 goto linear_scan;
796
797         /*
798          * Read journal buckets ordered by golden ratio hash to quickly
799          * find a sequence of buckets with valid journal entries
800          */
801         for (i = 0; i < ja->nr; i++) {
802                 l = (i * 2654435769U) % ja->nr;
803
804                 if (test_bit(l, bitmap))
805                         break;
806
807                 if (read_bucket(l))
808                         goto bsearch;
809         }
810
811         /*
812          * If that fails, check all the buckets we haven't checked
813          * already
814          */
815         pr_debug("falling back to linear search");
816 linear_scan:
817         for (l = find_first_zero_bit(bitmap, ja->nr);
818              l < ja->nr;
819              l = find_next_zero_bit(bitmap, ja->nr, l + 1))
820                 if (read_bucket(l))
821                         goto bsearch;
822
823         /* no journal entries on this device? */
824         if (l == ja->nr)
825                 goto out;
826 bsearch:
827         /* Binary search */
828         r = find_next_bit(bitmap, ja->nr, l + 1);
829         pr_debug("starting binary search, l %u r %u", l, r);
830
831         while (l + 1 < r) {
832                 unsigned m = (l + r) >> 1;
833                 u64 cur_seq = seq;
834
835                 read_bucket(m);
836
837                 if (cur_seq != seq)
838                         l = m;
839                 else
840                         r = m;
841         }
842
843 search_done:
844         /*
845          * Find the journal bucket with the highest sequence number:
846          *
847          * If there's duplicate journal entries in multiple buckets (which
848          * definitely isn't supposed to happen, but...) - make sure to start
849          * cur_idx at the last of those buckets, so we don't deadlock trying to
850          * allocate
851          */
852         seq = 0;
853
854         for (i = 0; i < ja->nr; i++)
855                 if (ja->bucket_seq[i] >= seq &&
856                     ja->bucket_seq[i] != ja->bucket_seq[(i + 1) % ja->nr]) {
857                         /*
858                          * When journal_next_bucket() goes to allocate for
859                          * the first time, it'll use the bucket after
860                          * ja->cur_idx
861                          */
862                         ja->cur_idx = i;
863                         seq = ja->bucket_seq[i];
864                 }
865
866         /*
867          * Set last_idx to indicate the entire journal is full and needs to be
868          * reclaimed - journal reclaim will immediately reclaim whatever isn't
869          * pinned when it first runs:
870          */
871         ja->last_idx = (ja->cur_idx + 1) % ja->nr;
872
873         /*
874          * Read buckets in reverse order until we stop finding more journal
875          * entries:
876          */
877         for (i = (ja->cur_idx + ja->nr - 1) % ja->nr;
878              i != ja->cur_idx;
879              i = (i + ja->nr - 1) % ja->nr)
880                 if (!test_bit(i, bitmap) &&
881                     !read_bucket(i))
882                         break;
883 out:
884         kvpfree(buf.data, buf.size);
885         percpu_ref_put(&ca->io_ref);
886         closure_return(cl);
887 err:
888         mutex_lock(&jlist->lock);
889         jlist->ret = ret;
890         mutex_unlock(&jlist->lock);
891         goto out;
892 #undef read_bucket
893 }
894
895 void bch2_journal_entries_free(struct list_head *list)
896 {
897
898         while (!list_empty(list)) {
899                 struct journal_replay *i =
900                         list_first_entry(list, struct journal_replay, list);
901                 list_del(&i->list);
902                 kvpfree(i, offsetof(struct journal_replay, j) +
903                         vstruct_bytes(&i->j));
904         }
905 }
906
907 static int journal_seq_blacklist_read(struct journal *j,
908                                       struct journal_replay *i,
909                                       struct journal_entry_pin_list *p)
910 {
911         struct bch_fs *c = container_of(j, struct bch_fs, journal);
912         struct jset_entry *entry;
913         struct journal_seq_blacklist *bl;
914         u64 seq;
915
916         for_each_jset_entry_type(entry, &i->j,
917                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED) {
918                 seq = le64_to_cpu(entry->_data[0]);
919
920                 bch_verbose(c, "blacklisting existing journal seq %llu", seq);
921
922                 bl = bch2_journal_seq_blacklisted_new(j, seq);
923                 if (!bl)
924                         return -ENOMEM;
925
926                 journal_pin_add_entry(j, p, &bl->pin,
927                                   journal_seq_blacklist_flush);
928                 bl->written = true;
929         }
930
931         return 0;
932 }
933
934 static inline bool journal_has_keys(struct list_head *list)
935 {
936         struct journal_replay *i;
937         struct jset_entry *entry;
938         struct bkey_i *k, *_n;
939
940         list_for_each_entry(i, list, list)
941                 for_each_jset_key(k, _n, entry, &i->j)
942                         return true;
943
944         return false;
945 }
946
947 int bch2_journal_read(struct bch_fs *c, struct list_head *list)
948 {
949         struct journal *j = &c->journal;
950         struct journal_list jlist;
951         struct journal_replay *i;
952         struct journal_entry_pin_list *p;
953         struct bch_dev *ca;
954         u64 cur_seq, end_seq;
955         unsigned iter, keys = 0, entries = 0;
956         int ret = 0;
957
958         closure_init_stack(&jlist.cl);
959         mutex_init(&jlist.lock);
960         jlist.head = list;
961         jlist.ret = 0;
962
963         for_each_readable_member(ca, c, iter) {
964                 percpu_ref_get(&ca->io_ref);
965                 closure_call(&ca->journal.read,
966                              bch2_journal_read_device,
967                              system_unbound_wq,
968                              &jlist.cl);
969         }
970
971         closure_sync(&jlist.cl);
972
973         if (jlist.ret)
974                 return jlist.ret;
975
976         if (list_empty(list)){
977                 bch_err(c, "no journal entries found");
978                 return BCH_FSCK_REPAIR_IMPOSSIBLE;
979         }
980
981         fsck_err_on(c->sb.clean && journal_has_keys(list), c,
982                     "filesystem marked clean but journal has keys to replay");
983
984         i = list_last_entry(list, struct journal_replay, list);
985
986         unfixable_fsck_err_on(le64_to_cpu(i->j.seq) -
987                         le64_to_cpu(i->j.last_seq) + 1 > j->pin.size, c,
988                         "too many journal entries open for refcount fifo");
989
990         atomic64_set(&j->seq, le64_to_cpu(i->j.seq));
991         j->last_seq_ondisk = le64_to_cpu(i->j.last_seq);
992
993         j->pin.front    = le64_to_cpu(i->j.last_seq);
994         j->pin.back     = le64_to_cpu(i->j.seq) + 1;
995
996         BUG_ON(last_seq(j) != le64_to_cpu(i->j.last_seq));
997         BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
998                &fifo_peek_back(&j->pin));
999
1000         fifo_for_each_entry_ptr(p, &j->pin, iter) {
1001                 INIT_LIST_HEAD(&p->list);
1002                 INIT_LIST_HEAD(&p->flushed);
1003                 atomic_set(&p->count, 0);
1004         }
1005
1006         mutex_lock(&j->blacklist_lock);
1007
1008         list_for_each_entry(i, list, list) {
1009                 p = journal_seq_pin(j, le64_to_cpu(i->j.seq));
1010
1011                 atomic_set(&p->count, 1);
1012
1013                 if (journal_seq_blacklist_read(j, i, p)) {
1014                         mutex_unlock(&j->blacklist_lock);
1015                         return -ENOMEM;
1016                 }
1017         }
1018
1019         mutex_unlock(&j->blacklist_lock);
1020
1021         cur_seq = last_seq(j);
1022         end_seq = le64_to_cpu(list_last_entry(list,
1023                                 struct journal_replay, list)->j.seq);
1024
1025         list_for_each_entry(i, list, list) {
1026                 struct jset_entry *entry;
1027                 struct bkey_i *k, *_n;
1028                 bool blacklisted;
1029
1030                 mutex_lock(&j->blacklist_lock);
1031                 while (cur_seq < le64_to_cpu(i->j.seq) &&
1032                        journal_seq_blacklist_find(j, cur_seq))
1033                         cur_seq++;
1034
1035                 blacklisted = journal_seq_blacklist_find(j,
1036                                                          le64_to_cpu(i->j.seq));
1037                 mutex_unlock(&j->blacklist_lock);
1038
1039                 fsck_err_on(blacklisted, c,
1040                             "found blacklisted journal entry %llu",
1041                             le64_to_cpu(i->j.seq));
1042
1043                 fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c,
1044                         "journal entries %llu-%llu missing! (replaying %llu-%llu)",
1045                         cur_seq, le64_to_cpu(i->j.seq) - 1,
1046                         last_seq(j), end_seq);
1047
1048                 cur_seq = le64_to_cpu(i->j.seq) + 1;
1049
1050                 for_each_jset_key(k, _n, entry, &i->j)
1051                         keys++;
1052                 entries++;
1053         }
1054
1055         bch_info(c, "journal read done, %i keys in %i entries, seq %llu",
1056                  keys, entries, (u64) atomic64_read(&j->seq));
1057 fsck_err:
1058         return ret;
1059 }
1060
1061 int bch2_journal_mark(struct bch_fs *c, struct list_head *list)
1062 {
1063         struct bkey_i *k, *n;
1064         struct jset_entry *j;
1065         struct journal_replay *r;
1066         int ret;
1067
1068         list_for_each_entry(r, list, list)
1069                 for_each_jset_key(k, n, j, &r->j) {
1070                         enum bkey_type type = bkey_type(j->level, j->btree_id);
1071                         struct bkey_s_c k_s_c = bkey_i_to_s_c(k);
1072
1073                         if (btree_type_has_ptrs(type)) {
1074                                 ret = bch2_btree_mark_key_initial(c, type, k_s_c);
1075                                 if (ret)
1076                                         return ret;
1077                         }
1078                 }
1079
1080         return 0;
1081 }
1082
1083 static bool journal_entry_is_open(struct journal *j)
1084 {
1085         return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
1086 }
1087
1088 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
1089 {
1090         struct journal_buf *w = journal_prev_buf(j);
1091
1092         atomic_dec_bug(&journal_seq_pin(j, w->data->seq)->count);
1093
1094         if (!need_write_just_set &&
1095             test_bit(JOURNAL_NEED_WRITE, &j->flags))
1096                 __bch2_time_stats_update(j->delay_time,
1097                                         j->need_write_time);
1098 #if 0
1099         closure_call(&j->io, journal_write, NULL, NULL);
1100 #else
1101         /* Shut sparse up: */
1102         closure_init(&j->io, NULL);
1103         set_closure_fn(&j->io, journal_write, NULL);
1104         journal_write(&j->io);
1105 #endif
1106 }
1107
1108 static void __journal_entry_new(struct journal *j, int count)
1109 {
1110         struct journal_entry_pin_list *p = fifo_push_ref(&j->pin);
1111
1112         /*
1113          * The fifo_push() needs to happen at the same time as j->seq is
1114          * incremented for last_seq() to be calculated correctly
1115          */
1116         atomic64_inc(&j->seq);
1117
1118         BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
1119                &fifo_peek_back(&j->pin));
1120
1121         INIT_LIST_HEAD(&p->list);
1122         INIT_LIST_HEAD(&p->flushed);
1123         atomic_set(&p->count, count);
1124 }
1125
1126 static void __bch2_journal_next_entry(struct journal *j)
1127 {
1128         struct journal_buf *buf;
1129
1130         __journal_entry_new(j, 1);
1131
1132         buf = journal_cur_buf(j);
1133         memset(buf->has_inode, 0, sizeof(buf->has_inode));
1134
1135         memset(buf->data, 0, sizeof(*buf->data));
1136         buf->data->seq  = cpu_to_le64(atomic64_read(&j->seq));
1137         buf->data->u64s = 0;
1138 }
1139
1140 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
1141 {
1142         return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1143 }
1144
1145 static enum {
1146         JOURNAL_ENTRY_ERROR,
1147         JOURNAL_ENTRY_INUSE,
1148         JOURNAL_ENTRY_CLOSED,
1149         JOURNAL_UNLOCKED,
1150 } journal_buf_switch(struct journal *j, bool need_write_just_set)
1151 {
1152         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1153         struct journal_buf *buf;
1154         union journal_res_state old, new;
1155         u64 v = atomic64_read(&j->reservations.counter);
1156
1157         lockdep_assert_held(&j->lock);
1158
1159         do {
1160                 old.v = new.v = v;
1161                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
1162                         return JOURNAL_ENTRY_CLOSED;
1163
1164                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1165                         return JOURNAL_ENTRY_ERROR;
1166
1167                 if (new.prev_buf_unwritten)
1168                         return JOURNAL_ENTRY_INUSE;
1169
1170                 /*
1171                  * avoid race between setting buf->data->u64s and
1172                  * journal_res_put starting write:
1173                  */
1174                 journal_state_inc(&new);
1175
1176                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
1177                 new.idx++;
1178                 new.prev_buf_unwritten = 1;
1179
1180                 BUG_ON(journal_state_count(new, new.idx));
1181         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1182                                        old.v, new.v)) != old.v);
1183
1184         journal_reclaim_fast(j);
1185
1186         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
1187
1188         buf = &j->buf[old.idx];
1189         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
1190         buf->data->last_seq     = cpu_to_le64(last_seq(j));
1191
1192         j->prev_buf_sectors =
1193                 vstruct_blocks_plus(buf->data, c->block_bits,
1194                                     journal_entry_u64s_reserve(buf)) *
1195                 c->sb.block_size;
1196
1197         BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
1198
1199         __bch2_journal_next_entry(j);
1200
1201         cancel_delayed_work(&j->write_work);
1202         spin_unlock(&j->lock);
1203
1204         if (c->bucket_journal_seq > 1 << 14) {
1205                 c->bucket_journal_seq = 0;
1206                 bch2_bucket_seq_cleanup(c);
1207         }
1208
1209         /* ugh - might be called from __journal_res_get() under wait_event() */
1210         __set_current_state(TASK_RUNNING);
1211         bch2_journal_buf_put(j, old.idx, need_write_just_set);
1212
1213         return JOURNAL_UNLOCKED;
1214 }
1215
1216 void bch2_journal_halt(struct journal *j)
1217 {
1218         union journal_res_state old, new;
1219         u64 v = atomic64_read(&j->reservations.counter);
1220
1221         do {
1222                 old.v = new.v = v;
1223                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1224                         return;
1225
1226                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
1227         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1228                                        old.v, new.v)) != old.v);
1229
1230         wake_up(&j->wait);
1231         closure_wake_up(&journal_cur_buf(j)->wait);
1232         closure_wake_up(&journal_prev_buf(j)->wait);
1233 }
1234
1235 static unsigned journal_dev_buckets_available(struct journal *j,
1236                                               struct bch_dev *ca)
1237 {
1238         struct journal_device *ja = &ca->journal;
1239         unsigned next = (ja->cur_idx + 1) % ja->nr;
1240         unsigned available = (ja->last_idx + ja->nr - next) % ja->nr;
1241
1242         /*
1243          * Hack to avoid a deadlock during journal replay:
1244          * journal replay might require setting a new btree
1245          * root, which requires writing another journal entry -
1246          * thus, if the journal is full (and this happens when
1247          * replaying the first journal bucket's entries) we're
1248          * screwed.
1249          *
1250          * So don't let the journal fill up unless we're in
1251          * replay:
1252          */
1253         if (test_bit(JOURNAL_REPLAY_DONE, &j->flags))
1254                 available = max((int) available - 2, 0);
1255
1256         /*
1257          * Don't use the last bucket unless writing the new last_seq
1258          * will make another bucket available:
1259          */
1260         if (ja->bucket_seq[ja->last_idx] >= last_seq(j))
1261                 available = max((int) available - 1, 0);
1262
1263         return available;
1264 }
1265
1266 /* returns number of sectors available for next journal entry: */
1267 static int journal_entry_sectors(struct journal *j)
1268 {
1269         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1270         struct bch_dev *ca;
1271         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
1272         unsigned sectors_available = UINT_MAX;
1273         unsigned i, nr_online = 0, nr_devs = 0;
1274
1275         lockdep_assert_held(&j->lock);
1276
1277         rcu_read_lock();
1278         for_each_member_device_rcu(ca, c, i,
1279                                    &c->rw_devs[BCH_DATA_JOURNAL]) {
1280                 struct journal_device *ja = &ca->journal;
1281                 unsigned buckets_required = 0;
1282
1283                 if (!ja->nr)
1284                         continue;
1285
1286                 sectors_available = min_t(unsigned, sectors_available,
1287                                           ca->mi.bucket_size);
1288
1289                 /*
1290                  * Note that we don't allocate the space for a journal entry
1291                  * until we write it out - thus, if we haven't started the write
1292                  * for the previous entry we have to make sure we have space for
1293                  * it too:
1294                  */
1295                 if (bch2_extent_has_device(e.c, ca->dev_idx)) {
1296                         if (j->prev_buf_sectors > ja->sectors_free)
1297                                 buckets_required++;
1298
1299                         if (j->prev_buf_sectors + sectors_available >
1300                             ja->sectors_free)
1301                                 buckets_required++;
1302                 } else {
1303                         if (j->prev_buf_sectors + sectors_available >
1304                             ca->mi.bucket_size)
1305                                 buckets_required++;
1306
1307                         buckets_required++;
1308                 }
1309
1310                 if (journal_dev_buckets_available(j, ca) >= buckets_required)
1311                         nr_devs++;
1312                 nr_online++;
1313         }
1314         rcu_read_unlock();
1315
1316         if (nr_online < c->opts.metadata_replicas_required)
1317                 return -EROFS;
1318
1319         if (nr_devs < min_t(unsigned, nr_online, c->opts.metadata_replicas))
1320                 return 0;
1321
1322         return sectors_available;
1323 }
1324
1325 /*
1326  * should _only_ called from journal_res_get() - when we actually want a
1327  * journal reservation - journal entry is open means journal is dirty:
1328  */
1329 static int journal_entry_open(struct journal *j)
1330 {
1331         struct journal_buf *buf = journal_cur_buf(j);
1332         ssize_t u64s;
1333         int ret = 0, sectors;
1334
1335         lockdep_assert_held(&j->lock);
1336         BUG_ON(journal_entry_is_open(j));
1337
1338         if (!fifo_free(&j->pin))
1339                 return 0;
1340
1341         sectors = journal_entry_sectors(j);
1342         if (sectors <= 0)
1343                 return sectors;
1344
1345         buf->disk_sectors       = sectors;
1346
1347         sectors = min_t(unsigned, sectors, buf->size >> 9);
1348         j->cur_buf_sectors      = sectors;
1349
1350         u64s = (sectors << 9) / sizeof(u64);
1351
1352         /* Subtract the journal header */
1353         u64s -= sizeof(struct jset) / sizeof(u64);
1354         /*
1355          * Btree roots, prio pointers don't get added until right before we do
1356          * the write:
1357          */
1358         u64s -= journal_entry_u64s_reserve(buf);
1359         u64s  = max_t(ssize_t, 0L, u64s);
1360
1361         BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
1362
1363         if (u64s > le32_to_cpu(buf->data->u64s)) {
1364                 union journal_res_state old, new;
1365                 u64 v = atomic64_read(&j->reservations.counter);
1366
1367                 /*
1368                  * Must be set before marking the journal entry as open:
1369                  */
1370                 j->cur_entry_u64s = u64s;
1371
1372                 do {
1373                         old.v = new.v = v;
1374
1375                         if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1376                                 return false;
1377
1378                         /* Handle any already added entries */
1379                         new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
1380                 } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1381                                                old.v, new.v)) != old.v);
1382                 ret = 1;
1383
1384                 wake_up(&j->wait);
1385
1386                 if (j->res_get_blocked_start) {
1387                         __bch2_time_stats_update(j->blocked_time,
1388                                                 j->res_get_blocked_start);
1389                         j->res_get_blocked_start = 0;
1390                 }
1391
1392                 mod_delayed_work(system_freezable_wq,
1393                                  &j->write_work,
1394                                  msecs_to_jiffies(j->write_delay_ms));
1395         }
1396
1397         return ret;
1398 }
1399
1400 void bch2_journal_start(struct bch_fs *c)
1401 {
1402         struct journal *j = &c->journal;
1403         struct journal_seq_blacklist *bl;
1404         u64 new_seq = 0;
1405
1406         list_for_each_entry(bl, &j->seq_blacklist, list)
1407                 new_seq = max(new_seq, bl->seq);
1408
1409         spin_lock(&j->lock);
1410
1411         set_bit(JOURNAL_STARTED, &j->flags);
1412
1413         while (atomic64_read(&j->seq) < new_seq)
1414                 __journal_entry_new(j, 0);
1415
1416         /*
1417          * journal_buf_switch() only inits the next journal entry when it
1418          * closes an open journal entry - the very first journal entry gets
1419          * initialized here:
1420          */
1421         __bch2_journal_next_entry(j);
1422
1423         /*
1424          * Adding entries to the next journal entry before allocating space on
1425          * disk for the next journal entry - this is ok, because these entries
1426          * only have to go down with the next journal entry we write:
1427          */
1428         list_for_each_entry(bl, &j->seq_blacklist, list)
1429                 if (!bl->written) {
1430                         bch2_journal_add_entry_noreservation(journal_cur_buf(j),
1431                                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED,
1432                                         0, 0, &bl->seq, 1);
1433
1434                         journal_pin_add_entry(j,
1435                                               &fifo_peek_back(&j->pin),
1436                                               &bl->pin,
1437                                               journal_seq_blacklist_flush);
1438                         bl->written = true;
1439                 }
1440
1441         spin_unlock(&j->lock);
1442
1443         queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
1444 }
1445
1446 int bch2_journal_replay(struct bch_fs *c, struct list_head *list)
1447 {
1448         struct journal *j = &c->journal;
1449         struct bkey_i *k, *_n;
1450         struct jset_entry *entry;
1451         struct journal_replay *i, *n;
1452         int ret = 0, did_replay = 0;
1453
1454         list_for_each_entry_safe(i, n, list, list) {
1455                 j->replay_pin_list =
1456                         journal_seq_pin(j, le64_to_cpu(i->j.seq));
1457
1458                 for_each_jset_key(k, _n, entry, &i->j) {
1459                         struct disk_reservation disk_res;
1460
1461                         if (entry->btree_id == BTREE_ID_ALLOC) {
1462                                 /*
1463                                  * allocation code handles replay for
1464                                  * BTREE_ID_ALLOC keys:
1465                                  */
1466                                 ret = bch2_alloc_replay_key(c, k->k.p);
1467                         } else {
1468
1469                                 /*
1470                                  * We might cause compressed extents to be
1471                                  * split, so we need to pass in a
1472                                  * disk_reservation:
1473                                  */
1474                                 BUG_ON(bch2_disk_reservation_get(c, &disk_res, 0, 0));
1475
1476                                 ret = bch2_btree_insert(c, entry->btree_id, k,
1477                                                         &disk_res, NULL, NULL,
1478                                                         BTREE_INSERT_NOFAIL|
1479                                                         BTREE_INSERT_JOURNAL_REPLAY);
1480                                 bch2_disk_reservation_put(c, &disk_res);
1481                         }
1482
1483                         if (ret) {
1484                                 bch_err(c, "journal replay: error %d while replaying key",
1485                                         ret);
1486                                 goto err;
1487                         }
1488
1489                         cond_resched();
1490                         did_replay = true;
1491                 }
1492
1493                 if (atomic_dec_and_test(&j->replay_pin_list->count))
1494                         wake_up(&j->wait);
1495         }
1496
1497         j->replay_pin_list = NULL;
1498
1499         if (did_replay) {
1500                 bch2_journal_flush_pins(&c->journal, U64_MAX);
1501
1502                 /*
1503                  * Write a new journal entry _before_ we start journalling new data -
1504                  * otherwise, we could end up with btree node bsets with journal seqs
1505                  * arbitrarily far in the future vs. the most recently written journal
1506                  * entry on disk, if we crash before writing the next journal entry:
1507                  */
1508                 ret = bch2_journal_meta(j);
1509                 if (ret) {
1510                         bch_err(c, "journal replay: error %d flushing journal", ret);
1511                         goto err;
1512                 }
1513         }
1514
1515         bch2_journal_set_replay_done(j);
1516 err:
1517         bch2_journal_entries_free(list);
1518         return ret;
1519 }
1520
1521 /*
1522  * Allocate more journal space at runtime - not currently making use if it, but
1523  * the code works:
1524  */
1525 static int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
1526                                        unsigned nr)
1527 {
1528         struct journal *j = &c->journal;
1529         struct journal_device *ja = &ca->journal;
1530         struct bch_sb_field_journal *journal_buckets;
1531         struct disk_reservation disk_res = { 0, 0 };
1532         struct closure cl;
1533         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
1534         int ret = 0;
1535
1536         closure_init_stack(&cl);
1537
1538         /* don't handle reducing nr of buckets yet: */
1539         if (nr <= ja->nr)
1540                 return 0;
1541
1542         /*
1543          * note: journal buckets aren't really counted as _sectors_ used yet, so
1544          * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
1545          * when space used goes up without a reservation - but we do need the
1546          * reservation to ensure we'll actually be able to allocate:
1547          */
1548
1549         if (bch2_disk_reservation_get(c, &disk_res,
1550                         bucket_to_sector(ca, nr - ja->nr), 0))
1551                 return -ENOSPC;
1552
1553         mutex_lock(&c->sb_lock);
1554
1555         ret = -ENOMEM;
1556         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1557         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1558         if (!new_buckets || !new_bucket_seq)
1559                 goto err;
1560
1561         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
1562                                 nr + sizeof(*journal_buckets) / sizeof(u64));
1563         if (!journal_buckets)
1564                 goto err;
1565
1566         spin_lock(&j->lock);
1567         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
1568         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
1569         swap(new_buckets,       ja->buckets);
1570         swap(new_bucket_seq,    ja->bucket_seq);
1571
1572         while (ja->nr < nr) {
1573                 /* must happen under journal lock, to avoid racing with gc: */
1574                 long b = bch2_bucket_alloc(c, ca, RESERVE_ALLOC);
1575                 if (b < 0) {
1576                         if (!closure_wait(&c->freelist_wait, &cl)) {
1577                                 spin_unlock(&j->lock);
1578                                 closure_sync(&cl);
1579                                 spin_lock(&j->lock);
1580                         }
1581                         continue;
1582                 }
1583
1584                 bch2_mark_metadata_bucket(ca, &ca->buckets[b],
1585                                          BUCKET_JOURNAL, false);
1586                 bch2_mark_alloc_bucket(ca, &ca->buckets[b], false);
1587
1588                 memmove(ja->buckets + ja->last_idx + 1,
1589                         ja->buckets + ja->last_idx,
1590                         (ja->nr - ja->last_idx) * sizeof(u64));
1591                 memmove(ja->bucket_seq + ja->last_idx + 1,
1592                         ja->bucket_seq + ja->last_idx,
1593                         (ja->nr - ja->last_idx) * sizeof(u64));
1594                 memmove(journal_buckets->buckets + ja->last_idx + 1,
1595                         journal_buckets->buckets + ja->last_idx,
1596                         (ja->nr - ja->last_idx) * sizeof(u64));
1597
1598                 ja->buckets[ja->last_idx] = b;
1599                 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(b);
1600
1601                 if (ja->last_idx < ja->nr) {
1602                         if (ja->cur_idx >= ja->last_idx)
1603                                 ja->cur_idx++;
1604                         ja->last_idx++;
1605                 }
1606                 ja->nr++;
1607
1608         }
1609         spin_unlock(&j->lock);
1610
1611         BUG_ON(bch2_sb_validate_journal(ca->disk_sb.sb, ca->mi));
1612
1613         bch2_write_super(c);
1614
1615         ret = 0;
1616 err:
1617         mutex_unlock(&c->sb_lock);
1618
1619         kfree(new_bucket_seq);
1620         kfree(new_buckets);
1621         bch2_disk_reservation_put(c, &disk_res);
1622
1623         if (!ret)
1624                 bch2_dev_allocator_add(c, ca);
1625
1626         return ret;
1627 }
1628
1629 int bch2_dev_journal_alloc(struct bch_dev *ca)
1630 {
1631         unsigned nr;
1632
1633         if (dynamic_fault("bcachefs:add:journal_alloc"))
1634                 return -ENOMEM;
1635
1636         /*
1637          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
1638          * is smaller:
1639          */
1640         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
1641                      BCH_JOURNAL_BUCKETS_MIN,
1642                      min(1 << 10,
1643                          (1 << 20) / ca->mi.bucket_size));
1644
1645         return bch2_set_nr_journal_buckets(ca->fs, ca, nr);
1646 }
1647
1648 /* Journalling */
1649
1650 /**
1651  * journal_reclaim_fast - do the fast part of journal reclaim
1652  *
1653  * Called from IO submission context, does not block. Cleans up after btree
1654  * write completions by advancing the journal pin and each cache's last_idx,
1655  * kicking off discards and background reclaim as necessary.
1656  */
1657 static void journal_reclaim_fast(struct journal *j)
1658 {
1659         struct journal_entry_pin_list temp;
1660         bool popped = false;
1661
1662         lockdep_assert_held(&j->lock);
1663
1664         /*
1665          * Unpin journal entries whose reference counts reached zero, meaning
1666          * all btree nodes got written out
1667          */
1668         while (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1669                 BUG_ON(!list_empty(&fifo_peek_front(&j->pin).list));
1670                 BUG_ON(!fifo_pop(&j->pin, temp));
1671                 popped = true;
1672         }
1673
1674         if (popped)
1675                 wake_up(&j->wait);
1676 }
1677
1678 /*
1679  * Journal entry pinning - machinery for holding a reference on a given journal
1680  * entry, marking it as dirty:
1681  */
1682
1683 static inline void __journal_pin_add(struct journal *j,
1684                                      struct journal_entry_pin_list *pin_list,
1685                                      struct journal_entry_pin *pin,
1686                                      journal_pin_flush_fn flush_fn)
1687 {
1688         BUG_ON(journal_pin_active(pin));
1689
1690         atomic_inc(&pin_list->count);
1691         pin->pin_list   = pin_list;
1692         pin->flush      = flush_fn;
1693
1694         if (flush_fn)
1695                 list_add(&pin->list, &pin_list->list);
1696         else
1697                 INIT_LIST_HEAD(&pin->list);
1698 }
1699
1700 static void journal_pin_add_entry(struct journal *j,
1701                                   struct journal_entry_pin_list *pin_list,
1702                                   struct journal_entry_pin *pin,
1703                                   journal_pin_flush_fn flush_fn)
1704 {
1705         spin_lock_irq(&j->pin_lock);
1706         __journal_pin_add(j, pin_list, pin, flush_fn);
1707         spin_unlock_irq(&j->pin_lock);
1708 }
1709
1710 void bch2_journal_pin_add(struct journal *j,
1711                           struct journal_res *res,
1712                           struct journal_entry_pin *pin,
1713                           journal_pin_flush_fn flush_fn)
1714 {
1715         struct journal_entry_pin_list *pin_list = res->ref
1716                 ? journal_seq_pin(j, res->seq)
1717                 : j->replay_pin_list;
1718
1719         spin_lock_irq(&j->pin_lock);
1720         __journal_pin_add(j, pin_list, pin, flush_fn);
1721         spin_unlock_irq(&j->pin_lock);
1722 }
1723
1724 static inline bool __journal_pin_drop(struct journal *j,
1725                                       struct journal_entry_pin *pin)
1726 {
1727         struct journal_entry_pin_list *pin_list = pin->pin_list;
1728
1729         pin->pin_list = NULL;
1730
1731         /* journal_reclaim_work() might have already taken us off the list */
1732         if (!list_empty_careful(&pin->list))
1733                 list_del_init(&pin->list);
1734
1735         return atomic_dec_and_test(&pin_list->count);
1736 }
1737
1738 void bch2_journal_pin_drop(struct journal *j,
1739                           struct journal_entry_pin *pin)
1740 {
1741         unsigned long flags;
1742         bool wakeup = false;
1743
1744         spin_lock_irqsave(&j->pin_lock, flags);
1745         if (journal_pin_active(pin))
1746                 wakeup = __journal_pin_drop(j, pin);
1747         spin_unlock_irqrestore(&j->pin_lock, flags);
1748
1749         /*
1750          * Unpinning a journal entry make make journal_next_bucket() succeed, if
1751          * writing a new last_seq will now make another bucket available:
1752          *
1753          * Nested irqsave is expensive, don't do the wakeup with lock held:
1754          */
1755         if (wakeup)
1756                 wake_up(&j->wait);
1757 }
1758
1759 void bch2_journal_pin_add_if_older(struct journal *j,
1760                                   struct journal_entry_pin *src_pin,
1761                                   struct journal_entry_pin *pin,
1762                                   journal_pin_flush_fn flush_fn)
1763 {
1764         spin_lock_irq(&j->pin_lock);
1765
1766         if (journal_pin_active(src_pin) &&
1767             (!journal_pin_active(pin) ||
1768              fifo_entry_idx(&j->pin, src_pin->pin_list) <
1769              fifo_entry_idx(&j->pin, pin->pin_list))) {
1770                 if (journal_pin_active(pin))
1771                         __journal_pin_drop(j, pin);
1772                 __journal_pin_add(j, src_pin->pin_list, pin, flush_fn);
1773         }
1774
1775         spin_unlock_irq(&j->pin_lock);
1776 }
1777
1778 static struct journal_entry_pin *
1779 journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
1780 {
1781         struct journal_entry_pin_list *pin_list;
1782         struct journal_entry_pin *ret = NULL;
1783         unsigned iter;
1784
1785         /* so we don't iterate over empty fifo entries below: */
1786         if (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1787                 spin_lock(&j->lock);
1788                 journal_reclaim_fast(j);
1789                 spin_unlock(&j->lock);
1790         }
1791
1792         spin_lock_irq(&j->pin_lock);
1793         fifo_for_each_entry_ptr(pin_list, &j->pin, iter) {
1794                 if (journal_pin_seq(j, pin_list) > seq_to_flush)
1795                         break;
1796
1797                 ret = list_first_entry_or_null(&pin_list->list,
1798                                 struct journal_entry_pin, list);
1799                 if (ret) {
1800                         /* must be list_del_init(), see bch2_journal_pin_drop() */
1801                         list_move(&ret->list, &pin_list->flushed);
1802                         *seq = journal_pin_seq(j, pin_list);
1803                         break;
1804                 }
1805         }
1806         spin_unlock_irq(&j->pin_lock);
1807
1808         return ret;
1809 }
1810
1811 static bool journal_flush_done(struct journal *j, u64 seq_to_flush)
1812 {
1813         bool ret;
1814
1815         spin_lock(&j->lock);
1816         journal_reclaim_fast(j);
1817
1818         ret = (fifo_used(&j->pin) == 1 &&
1819                atomic_read(&fifo_peek_front(&j->pin).count) == 1) ||
1820                 last_seq(j) > seq_to_flush;
1821         spin_unlock(&j->lock);
1822
1823         return ret;
1824 }
1825
1826 void bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
1827 {
1828         struct journal_entry_pin *pin;
1829         u64 pin_seq;
1830
1831         if (!test_bit(JOURNAL_STARTED, &j->flags))
1832                 return;
1833
1834         while ((pin = journal_get_next_pin(j, seq_to_flush, &pin_seq)))
1835                 pin->flush(j, pin, pin_seq);
1836
1837         wait_event(j->wait,
1838                    journal_flush_done(j, seq_to_flush) ||
1839                    bch2_journal_error(j));
1840 }
1841
1842 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
1843 {
1844         bool ret;
1845
1846         spin_lock(&j->lock);
1847         ret = ja->nr &&
1848                 (ja->last_idx != ja->cur_idx &&
1849                  ja->bucket_seq[ja->last_idx] < j->last_seq_ondisk);
1850         spin_unlock(&j->lock);
1851
1852         return ret;
1853 }
1854
1855 /**
1856  * journal_reclaim_work - free up journal buckets
1857  *
1858  * Background journal reclaim writes out btree nodes. It should be run
1859  * early enough so that we never completely run out of journal buckets.
1860  *
1861  * High watermarks for triggering background reclaim:
1862  * - FIFO has fewer than 512 entries left
1863  * - fewer than 25% journal buckets free
1864  *
1865  * Background reclaim runs until low watermarks are reached:
1866  * - FIFO has more than 1024 entries left
1867  * - more than 50% journal buckets free
1868  *
1869  * As long as a reclaim can complete in the time it takes to fill up
1870  * 512 journal entries or 25% of all journal buckets, then
1871  * journal_next_bucket() should not stall.
1872  */
1873 static void journal_reclaim_work(struct work_struct *work)
1874 {
1875         struct bch_fs *c = container_of(to_delayed_work(work),
1876                                 struct bch_fs, journal.reclaim_work);
1877         struct journal *j = &c->journal;
1878         struct bch_dev *ca;
1879         struct journal_entry_pin *pin;
1880         u64 seq, seq_to_flush = 0;
1881         unsigned iter, bucket_to_flush;
1882         unsigned long next_flush;
1883         bool reclaim_lock_held = false, need_flush;
1884
1885         /*
1886          * Advance last_idx to point to the oldest journal entry containing
1887          * btree node updates that have not yet been written out
1888          */
1889         for_each_rw_member(ca, c, iter) {
1890                 struct journal_device *ja = &ca->journal;
1891
1892                 if (!ja->nr)
1893                         continue;
1894
1895                 while (should_discard_bucket(j, ja)) {
1896                         if (!reclaim_lock_held) {
1897                                 /*
1898                                  * ugh:
1899                                  * might be called from __journal_res_get()
1900                                  * under wait_event() - have to go back to
1901                                  * TASK_RUNNING before doing something that
1902                                  * would block, but only if we're doing work:
1903                                  */
1904                                 __set_current_state(TASK_RUNNING);
1905
1906                                 mutex_lock(&j->reclaim_lock);
1907                                 reclaim_lock_held = true;
1908                                 /* recheck under reclaim_lock: */
1909                                 continue;
1910                         }
1911
1912                         if (ca->mi.discard &&
1913                             blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
1914                                 blkdev_issue_discard(ca->disk_sb.bdev,
1915                                         bucket_to_sector(ca,
1916                                                 ja->buckets[ja->last_idx]),
1917                                         ca->mi.bucket_size, GFP_NOIO, 0);
1918
1919                         spin_lock(&j->lock);
1920                         ja->last_idx = (ja->last_idx + 1) % ja->nr;
1921                         spin_unlock(&j->lock);
1922
1923                         wake_up(&j->wait);
1924                 }
1925
1926                 /*
1927                  * Write out enough btree nodes to free up 50% journal
1928                  * buckets
1929                  */
1930                 spin_lock(&j->lock);
1931                 bucket_to_flush = (ja->cur_idx + (ja->nr >> 1)) % ja->nr;
1932                 seq_to_flush = max_t(u64, seq_to_flush,
1933                                      ja->bucket_seq[bucket_to_flush]);
1934                 spin_unlock(&j->lock);
1935         }
1936
1937         if (reclaim_lock_held)
1938                 mutex_unlock(&j->reclaim_lock);
1939
1940         /* Also flush if the pin fifo is more than half full */
1941         seq_to_flush = max_t(s64, seq_to_flush,
1942                              (s64) atomic64_read(&j->seq) -
1943                              (j->pin.size >> 1));
1944
1945         /*
1946          * If it's been longer than j->reclaim_delay_ms since we last flushed,
1947          * make sure to flush at least one journal pin:
1948          */
1949         next_flush = j->last_flushed + msecs_to_jiffies(j->reclaim_delay_ms);
1950         need_flush = time_after(jiffies, next_flush);
1951
1952         while ((pin = journal_get_next_pin(j, need_flush
1953                                            ? U64_MAX
1954                                            : seq_to_flush, &seq))) {
1955                 __set_current_state(TASK_RUNNING);
1956                 pin->flush(j, pin, seq);
1957                 need_flush = false;
1958
1959                 j->last_flushed = jiffies;
1960         }
1961
1962         if (!test_bit(BCH_FS_RO, &c->flags))
1963                 queue_delayed_work(system_freezable_wq, &j->reclaim_work,
1964                                    msecs_to_jiffies(j->reclaim_delay_ms));
1965 }
1966
1967 /**
1968  * journal_next_bucket - move on to the next journal bucket if possible
1969  */
1970 static int journal_write_alloc(struct journal *j, unsigned sectors)
1971 {
1972         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1973         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
1974         struct bch_extent_ptr *ptr;
1975         struct journal_device *ja;
1976         struct bch_dev *ca;
1977         struct dev_alloc_list devs_sorted;
1978         unsigned i, replicas, replicas_want =
1979                 READ_ONCE(c->opts.metadata_replicas);
1980
1981         spin_lock(&j->lock);
1982
1983         /*
1984          * Drop any pointers to devices that have been removed, are no longer
1985          * empty, or filled up their current journal bucket:
1986          *
1987          * Note that a device may have had a small amount of free space (perhaps
1988          * one sector) that wasn't enough for the smallest possible journal
1989          * entry - that's why we drop pointers to devices <= current free space,
1990          * i.e. whichever device was limiting the current journal entry size.
1991          */
1992         extent_for_each_ptr_backwards(e, ptr) {
1993                 ca = c->devs[ptr->dev];
1994
1995                 if (ca->mi.state != BCH_MEMBER_STATE_RW ||
1996                     ca->journal.sectors_free <= sectors)
1997                         __bch2_extent_drop_ptr(e, ptr);
1998                 else
1999                         ca->journal.sectors_free -= sectors;
2000         }
2001
2002         replicas = bch2_extent_nr_ptrs(e.c);
2003
2004         rcu_read_lock();
2005         devs_sorted = bch2_wp_alloc_list(c, &j->wp,
2006                                          &c->rw_devs[BCH_DATA_JOURNAL]);
2007
2008         for (i = 0; i < devs_sorted.nr; i++) {
2009                 ca = rcu_dereference(c->devs[devs_sorted.devs[i]]);
2010                 if (!ca)
2011                         continue;
2012
2013                 ja = &ca->journal;
2014                 if (!ja->nr)
2015                         continue;
2016
2017                 if (replicas >= replicas_want)
2018                         break;
2019
2020                 /*
2021                  * Check that we can use this device, and aren't already using
2022                  * it:
2023                  */
2024                 if (bch2_extent_has_device(e.c, ca->dev_idx) ||
2025                     !journal_dev_buckets_available(j, ca) ||
2026                     sectors > ca->mi.bucket_size)
2027                         continue;
2028
2029                 j->wp.next_alloc[ca->dev_idx] += U32_MAX;
2030                 bch2_wp_rescale(c, ca, &j->wp);
2031
2032                 ja->sectors_free = ca->mi.bucket_size - sectors;
2033                 ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
2034                 ja->bucket_seq[ja->cur_idx] = atomic64_read(&j->seq);
2035
2036                 extent_ptr_append(bkey_i_to_extent(&j->key),
2037                         (struct bch_extent_ptr) {
2038                                   .offset = bucket_to_sector(ca,
2039                                         ja->buckets[ja->cur_idx]),
2040                                   .dev = ca->dev_idx,
2041                 });
2042                 replicas++;
2043         }
2044         rcu_read_unlock();
2045
2046         j->prev_buf_sectors = 0;
2047         spin_unlock(&j->lock);
2048
2049         if (replicas < c->opts.metadata_replicas_required)
2050                 return -EROFS;
2051
2052         BUG_ON(!replicas);
2053
2054         return 0;
2055 }
2056
2057 static void journal_write_compact(struct jset *jset)
2058 {
2059         struct jset_entry *i, *next, *prev = NULL;
2060
2061         /*
2062          * Simple compaction, dropping empty jset_entries (from journal
2063          * reservations that weren't fully used) and merging jset_entries that
2064          * can be.
2065          *
2066          * If we wanted to be really fancy here, we could sort all the keys in
2067          * the jset and drop keys that were overwritten - probably not worth it:
2068          */
2069         vstruct_for_each_safe(jset, i, next) {
2070                 unsigned u64s = le16_to_cpu(i->u64s);
2071
2072                 /* Empty entry: */
2073                 if (!u64s)
2074                         continue;
2075
2076                 /* Can we merge with previous entry? */
2077                 if (prev &&
2078                     i->btree_id == prev->btree_id &&
2079                     i->level    == prev->level &&
2080                     i->type     == prev->type &&
2081                     i->type     == JOURNAL_ENTRY_BTREE_KEYS &&
2082                     le16_to_cpu(prev->u64s) + u64s <= U16_MAX) {
2083                         memmove_u64s_down(vstruct_next(prev),
2084                                           i->_data,
2085                                           u64s);
2086                         le16_add_cpu(&prev->u64s, u64s);
2087                         continue;
2088                 }
2089
2090                 /* Couldn't merge, move i into new position (after prev): */
2091                 prev = prev ? vstruct_next(prev) : jset->start;
2092                 if (i != prev)
2093                         memmove_u64s_down(prev, i, jset_u64s(u64s));
2094         }
2095
2096         prev = prev ? vstruct_next(prev) : jset->start;
2097         jset->u64s = cpu_to_le32((u64 *) prev - jset->_data);
2098 }
2099
2100 static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
2101 {
2102         /* we aren't holding j->lock: */
2103         unsigned new_size = READ_ONCE(j->buf_size_want);
2104         void *new_buf;
2105
2106         if (buf->size >= new_size)
2107                 return;
2108
2109         new_buf = kvpmalloc(new_size, GFP_NOIO|__GFP_NOWARN);
2110         if (!new_buf)
2111                 return;
2112
2113         memcpy(new_buf, buf->data, buf->size);
2114         kvpfree(buf->data, buf->size);
2115         buf->data       = new_buf;
2116         buf->size       = new_size;
2117 }
2118
2119 static void journal_write_done(struct closure *cl)
2120 {
2121         struct journal *j = container_of(cl, struct journal, io);
2122         struct journal_buf *w = journal_prev_buf(j);
2123
2124         __bch2_time_stats_update(j->write_time, j->write_start_time);
2125
2126         spin_lock(&j->lock);
2127         j->last_seq_ondisk = le64_to_cpu(w->data->last_seq);
2128
2129         /*
2130          * Updating last_seq_ondisk may let journal_reclaim_work() discard more
2131          * buckets:
2132          *
2133          * Must come before signaling write completion, for
2134          * bch2_fs_journal_stop():
2135          */
2136         mod_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
2137
2138         /* also must come before signalling write completion: */
2139         closure_debug_destroy(cl);
2140
2141         BUG_ON(!j->reservations.prev_buf_unwritten);
2142         atomic64_sub(((union journal_res_state) { .prev_buf_unwritten = 1 }).v,
2143                      &j->reservations.counter);
2144
2145         closure_wake_up(&w->wait);
2146         wake_up(&j->wait);
2147
2148         if (test_bit(JOURNAL_NEED_WRITE, &j->flags))
2149                 mod_delayed_work(system_freezable_wq, &j->write_work, 0);
2150         spin_unlock(&j->lock);
2151 }
2152
2153 static void journal_write_error(struct closure *cl)
2154 {
2155         struct journal *j = container_of(cl, struct journal, io);
2156         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2157         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
2158
2159         while (j->replicas_failed) {
2160                 unsigned idx = __fls(j->replicas_failed);
2161
2162                 bch2_extent_drop_ptr_idx(e, idx);
2163                 j->replicas_failed ^= 1 << idx;
2164         }
2165
2166         if (!bch2_extent_nr_ptrs(e.c)) {
2167                 bch_err(c, "unable to write journal to sufficient devices");
2168                 goto err;
2169         }
2170
2171         if (bch2_check_mark_super(c, e.c, BCH_DATA_JOURNAL))
2172                 goto err;
2173
2174 out:
2175         journal_write_done(cl);
2176         return;
2177 err:
2178         bch2_fatal_error(c);
2179         bch2_journal_halt(j);
2180         goto out;
2181 }
2182
2183 static void journal_write_endio(struct bio *bio)
2184 {
2185         struct bch_dev *ca = bio->bi_private;
2186         struct journal *j = &ca->fs->journal;
2187
2188         if (bch2_dev_io_err_on(bio->bi_error, ca, "journal write") ||
2189             bch2_meta_write_fault("journal")) {
2190                 /* Was this a flush or an actual journal write? */
2191                 if (ca->journal.ptr_idx != U8_MAX) {
2192                         set_bit(ca->journal.ptr_idx, &j->replicas_failed);
2193                         set_closure_fn(&j->io, journal_write_error,
2194                                        system_highpri_wq);
2195                 }
2196         }
2197
2198         closure_put(&j->io);
2199         percpu_ref_put(&ca->io_ref);
2200 }
2201
2202 static void journal_write(struct closure *cl)
2203 {
2204         struct journal *j = container_of(cl, struct journal, io);
2205         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2206         struct bch_dev *ca;
2207         struct journal_buf *w = journal_prev_buf(j);
2208         struct jset *jset;
2209         struct bio *bio;
2210         struct bch_extent_ptr *ptr;
2211         unsigned i, sectors, bytes, ptr_idx = 0;
2212
2213         journal_buf_realloc(j, w);
2214         jset = w->data;
2215
2216         j->write_start_time = local_clock();
2217         mutex_lock(&c->btree_root_lock);
2218         for (i = 0; i < BTREE_ID_NR; i++) {
2219                 struct btree_root *r = &c->btree_roots[i];
2220
2221                 if (r->alive)
2222                         bch2_journal_add_btree_root(w, i, &r->key, r->level);
2223         }
2224         mutex_unlock(&c->btree_root_lock);
2225
2226         journal_write_compact(jset);
2227
2228         jset->read_clock        = cpu_to_le16(c->prio_clock[READ].hand);
2229         jset->write_clock       = cpu_to_le16(c->prio_clock[WRITE].hand);
2230         jset->magic             = cpu_to_le64(jset_magic(c));
2231         jset->version           = cpu_to_le32(BCACHE_JSET_VERSION);
2232
2233         SET_JSET_BIG_ENDIAN(jset, CPU_BIG_ENDIAN);
2234         SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
2235
2236         if (bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
2237             __journal_entry_validate(c, jset, WRITE))
2238                 goto err;
2239
2240         bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
2241                     jset->encrypted_start,
2242                     vstruct_end(jset) - (void *) jset->encrypted_start);
2243
2244         jset->csum = csum_vstruct(c, JSET_CSUM_TYPE(jset),
2245                                   journal_nonce(jset), jset);
2246
2247         if (!bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
2248             __journal_entry_validate(c, jset, WRITE))
2249                 goto err;
2250
2251         sectors = vstruct_sectors(jset, c->block_bits);
2252         BUG_ON(sectors > j->prev_buf_sectors);
2253
2254         bytes = vstruct_bytes(w->data);
2255         memset((void *) w->data + bytes, 0, (sectors << 9) - bytes);
2256
2257         if (journal_write_alloc(j, sectors)) {
2258                 bch2_journal_halt(j);
2259                 bch_err(c, "Unable to allocate journal write");
2260                 bch2_fatal_error(c);
2261                 continue_at(cl, journal_write_done, system_highpri_wq);
2262         }
2263
2264         if (bch2_check_mark_super(c, bkey_i_to_s_c_extent(&j->key),
2265                                   BCH_DATA_JOURNAL))
2266                 goto err;
2267
2268         /*
2269          * XXX: we really should just disable the entire journal in nochanges
2270          * mode
2271          */
2272         if (c->opts.nochanges)
2273                 goto no_io;
2274
2275         extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr) {
2276                 ca = c->devs[ptr->dev];
2277                 if (!percpu_ref_tryget(&ca->io_ref)) {
2278                         /* XXX: fix this */
2279                         bch_err(c, "missing device for journal write\n");
2280                         continue;
2281                 }
2282
2283                 this_cpu_add(ca->io_done->sectors[WRITE][BCH_DATA_JOURNAL],
2284                              sectors);
2285
2286                 ca->journal.ptr_idx     = ptr_idx++;
2287                 bio = ca->journal.bio;
2288                 bio_reset(bio);
2289                 bio->bi_iter.bi_sector  = ptr->offset;
2290                 bio->bi_bdev            = ca->disk_sb.bdev;
2291                 bio->bi_iter.bi_size    = sectors << 9;
2292                 bio->bi_end_io          = journal_write_endio;
2293                 bio->bi_private         = ca;
2294                 bio_set_op_attrs(bio, REQ_OP_WRITE,
2295                                  REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
2296                 bch2_bio_map(bio, jset);
2297
2298                 trace_journal_write(bio);
2299                 closure_bio_submit(bio, cl);
2300
2301                 ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(w->data->seq);
2302         }
2303
2304         for_each_rw_member(ca, c, i)
2305                 if (journal_flushes_device(ca) &&
2306                     !bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key), i)) {
2307                         percpu_ref_get(&ca->io_ref);
2308
2309                         ca->journal.ptr_idx = U8_MAX;
2310                         bio = ca->journal.bio;
2311                         bio_reset(bio);
2312                         bio->bi_bdev            = ca->disk_sb.bdev;
2313                         bio->bi_opf             = REQ_OP_FLUSH;
2314                         bio->bi_end_io          = journal_write_endio;
2315                         bio->bi_private         = ca;
2316                         closure_bio_submit(bio, cl);
2317                 }
2318
2319 no_io:
2320         extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr)
2321                 ptr->offset += sectors;
2322
2323         continue_at(cl, journal_write_done, system_highpri_wq);
2324 err:
2325         bch2_inconsistent_error(c);
2326         continue_at(cl, journal_write_done, system_highpri_wq);
2327 }
2328
2329 static void journal_write_work(struct work_struct *work)
2330 {
2331         struct journal *j = container_of(to_delayed_work(work),
2332                                          struct journal, write_work);
2333         spin_lock(&j->lock);
2334         if (!journal_entry_is_open(j)) {
2335                 spin_unlock(&j->lock);
2336                 return;
2337         }
2338
2339         set_bit(JOURNAL_NEED_WRITE, &j->flags);
2340         if (journal_buf_switch(j, false) != JOURNAL_UNLOCKED)
2341                 spin_unlock(&j->lock);
2342 }
2343
2344 /*
2345  * Given an inode number, if that inode number has data in the journal that
2346  * hasn't yet been flushed, return the journal sequence number that needs to be
2347  * flushed:
2348  */
2349 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
2350 {
2351         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
2352         u64 seq = 0;
2353
2354         if (!test_bit(h, j->buf[0].has_inode) &&
2355             !test_bit(h, j->buf[1].has_inode))
2356                 return 0;
2357
2358         spin_lock(&j->lock);
2359         if (test_bit(h, journal_cur_buf(j)->has_inode))
2360                 seq = atomic64_read(&j->seq);
2361         else if (test_bit(h, journal_prev_buf(j)->has_inode))
2362                 seq = atomic64_read(&j->seq) - 1;
2363         spin_unlock(&j->lock);
2364
2365         return seq;
2366 }
2367
2368 static int __journal_res_get(struct journal *j, struct journal_res *res,
2369                               unsigned u64s_min, unsigned u64s_max)
2370 {
2371         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2372         struct journal_buf *buf;
2373         int ret;
2374 retry:
2375         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2376         if (ret)
2377                 return ret;
2378
2379         spin_lock(&j->lock);
2380         /*
2381          * Recheck after taking the lock, so we don't race with another thread
2382          * that just did journal_entry_open() and call journal_entry_close()
2383          * unnecessarily
2384          */
2385         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2386         if (ret) {
2387                 spin_unlock(&j->lock);
2388                 return 1;
2389         }
2390
2391         /*
2392          * If we couldn't get a reservation because the current buf filled up,
2393          * and we had room for a bigger entry on disk, signal that we want to
2394          * realloc the journal bufs:
2395          */
2396         buf = journal_cur_buf(j);
2397         if (journal_entry_is_open(j) &&
2398             buf->size >> 9 < buf->disk_sectors &&
2399             buf->size < JOURNAL_ENTRY_SIZE_MAX)
2400                 j->buf_size_want = max(j->buf_size_want, buf->size << 1);
2401
2402         /*
2403          * Close the current journal entry if necessary, then try to start a new
2404          * one:
2405          */
2406         switch (journal_buf_switch(j, false)) {
2407         case JOURNAL_ENTRY_ERROR:
2408                 spin_unlock(&j->lock);
2409                 return -EROFS;
2410         case JOURNAL_ENTRY_INUSE:
2411                 /* haven't finished writing out the previous one: */
2412                 spin_unlock(&j->lock);
2413                 trace_journal_entry_full(c);
2414                 goto blocked;
2415         case JOURNAL_ENTRY_CLOSED:
2416                 break;
2417         case JOURNAL_UNLOCKED:
2418                 goto retry;
2419         }
2420
2421         /* We now have a new, closed journal buf - see if we can open it: */
2422         ret = journal_entry_open(j);
2423         spin_unlock(&j->lock);
2424
2425         if (ret < 0)
2426                 return ret;
2427         if (ret)
2428                 goto retry;
2429
2430         /* Journal's full, we have to wait */
2431
2432         /*
2433          * Direct reclaim - can't rely on reclaim from work item
2434          * due to freezing..
2435          */
2436         journal_reclaim_work(&j->reclaim_work.work);
2437
2438         trace_journal_full(c);
2439 blocked:
2440         if (!j->res_get_blocked_start)
2441                 j->res_get_blocked_start = local_clock() ?: 1;
2442         return 0;
2443 }
2444
2445 /*
2446  * Essentially the entry function to the journaling code. When bcachefs is doing
2447  * a btree insert, it calls this function to get the current journal write.
2448  * Journal write is the structure used set up journal writes. The calling
2449  * function will then add its keys to the structure, queuing them for the next
2450  * write.
2451  *
2452  * To ensure forward progress, the current task must not be holding any
2453  * btree node write locks.
2454  */
2455 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
2456                                  unsigned u64s_min, unsigned u64s_max)
2457 {
2458         int ret;
2459
2460         wait_event(j->wait,
2461                    (ret = __journal_res_get(j, res, u64s_min,
2462                                             u64s_max)));
2463         return ret < 0 ? ret : 0;
2464 }
2465
2466 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
2467 {
2468         spin_lock(&j->lock);
2469
2470         BUG_ON(seq > atomic64_read(&j->seq));
2471
2472         if (bch2_journal_error(j)) {
2473                 spin_unlock(&j->lock);
2474                 return;
2475         }
2476
2477         if (seq == atomic64_read(&j->seq)) {
2478                 if (!closure_wait(&journal_cur_buf(j)->wait, parent))
2479                         BUG();
2480         } else if (seq + 1 == atomic64_read(&j->seq) &&
2481                    j->reservations.prev_buf_unwritten) {
2482                 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
2483                         BUG();
2484
2485                 smp_mb();
2486
2487                 /* check if raced with write completion (or failure) */
2488                 if (!j->reservations.prev_buf_unwritten ||
2489                     bch2_journal_error(j))
2490                         closure_wake_up(&journal_prev_buf(j)->wait);
2491         }
2492
2493         spin_unlock(&j->lock);
2494 }
2495
2496 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
2497 {
2498         struct journal_buf *buf;
2499
2500         spin_lock(&j->lock);
2501
2502         BUG_ON(seq > atomic64_read(&j->seq));
2503
2504         if (bch2_journal_error(j)) {
2505                 spin_unlock(&j->lock);
2506                 return;
2507         }
2508
2509         if (seq == atomic64_read(&j->seq)) {
2510                 bool set_need_write = false;
2511
2512                 buf = journal_cur_buf(j);
2513
2514                 if (parent && !closure_wait(&buf->wait, parent))
2515                         BUG();
2516
2517                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2518                         j->need_write_time = local_clock();
2519                         set_need_write = true;
2520                 }
2521
2522                 switch (journal_buf_switch(j, set_need_write)) {
2523                 case JOURNAL_ENTRY_ERROR:
2524                         if (parent)
2525                                 closure_wake_up(&buf->wait);
2526                         break;
2527                 case JOURNAL_ENTRY_CLOSED:
2528                         /*
2529                          * Journal entry hasn't been opened yet, but caller
2530                          * claims it has something (seq == j->seq):
2531                          */
2532                         BUG();
2533                 case JOURNAL_ENTRY_INUSE:
2534                         break;
2535                 case JOURNAL_UNLOCKED:
2536                         return;
2537                 }
2538         } else if (parent &&
2539                    seq + 1 == atomic64_read(&j->seq) &&
2540                    j->reservations.prev_buf_unwritten) {
2541                 buf = journal_prev_buf(j);
2542
2543                 if (!closure_wait(&buf->wait, parent))
2544                         BUG();
2545
2546                 smp_mb();
2547
2548                 /* check if raced with write completion (or failure) */
2549                 if (!j->reservations.prev_buf_unwritten ||
2550                     bch2_journal_error(j))
2551                         closure_wake_up(&buf->wait);
2552         }
2553
2554         spin_unlock(&j->lock);
2555 }
2556
2557 static int journal_seq_flushed(struct journal *j, u64 seq)
2558 {
2559         struct journal_buf *buf;
2560         int ret = 1;
2561
2562         spin_lock(&j->lock);
2563         BUG_ON(seq > atomic64_read(&j->seq));
2564
2565         if (seq == atomic64_read(&j->seq)) {
2566                 bool set_need_write = false;
2567
2568                 ret = 0;
2569
2570                 buf = journal_cur_buf(j);
2571
2572                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2573                         j->need_write_time = local_clock();
2574                         set_need_write = true;
2575                 }
2576
2577                 switch (journal_buf_switch(j, set_need_write)) {
2578                 case JOURNAL_ENTRY_ERROR:
2579                         ret = -EIO;
2580                         break;
2581                 case JOURNAL_ENTRY_CLOSED:
2582                         /*
2583                          * Journal entry hasn't been opened yet, but caller
2584                          * claims it has something (seq == j->seq):
2585                          */
2586                         BUG();
2587                 case JOURNAL_ENTRY_INUSE:
2588                         break;
2589                 case JOURNAL_UNLOCKED:
2590                         return 0;
2591                 }
2592         } else if (seq + 1 == atomic64_read(&j->seq) &&
2593                    j->reservations.prev_buf_unwritten) {
2594                 ret = bch2_journal_error(j);
2595         }
2596
2597         spin_unlock(&j->lock);
2598
2599         return ret;
2600 }
2601
2602 int bch2_journal_flush_seq(struct journal *j, u64 seq)
2603 {
2604         u64 start_time = local_clock();
2605         int ret, ret2;
2606
2607         ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
2608
2609         bch2_time_stats_update(j->flush_seq_time, start_time);
2610
2611         return ret ?: ret2 < 0 ? ret2 : 0;
2612 }
2613
2614 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
2615 {
2616         struct journal_res res;
2617         unsigned u64s = jset_u64s(0);
2618
2619         memset(&res, 0, sizeof(res));
2620
2621         bch2_journal_res_get(j, &res, u64s, u64s);
2622         bch2_journal_res_put(j, &res);
2623
2624         bch2_journal_flush_seq_async(j, res.seq, parent);
2625 }
2626
2627 int bch2_journal_meta(struct journal *j)
2628 {
2629         struct journal_res res;
2630         unsigned u64s = jset_u64s(0);
2631         int ret;
2632
2633         memset(&res, 0, sizeof(res));
2634
2635         ret = bch2_journal_res_get(j, &res, u64s, u64s);
2636         if (ret)
2637                 return ret;
2638
2639         bch2_journal_res_put(j, &res);
2640
2641         return bch2_journal_flush_seq(j, res.seq);
2642 }
2643
2644 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
2645 {
2646         u64 seq, journal_seq;
2647
2648         spin_lock(&j->lock);
2649         journal_seq = atomic64_read(&j->seq);
2650
2651         if (journal_entry_is_open(j)) {
2652                 seq = journal_seq;
2653         } else if (journal_seq) {
2654                 seq = journal_seq - 1;
2655         } else {
2656                 spin_unlock(&j->lock);
2657                 return;
2658         }
2659         spin_unlock(&j->lock);
2660
2661         bch2_journal_flush_seq_async(j, seq, parent);
2662 }
2663
2664 int bch2_journal_flush(struct journal *j)
2665 {
2666         u64 seq, journal_seq;
2667
2668         spin_lock(&j->lock);
2669         journal_seq = atomic64_read(&j->seq);
2670
2671         if (journal_entry_is_open(j)) {
2672                 seq = journal_seq;
2673         } else if (journal_seq) {
2674                 seq = journal_seq - 1;
2675         } else {
2676                 spin_unlock(&j->lock);
2677                 return 0;
2678         }
2679         spin_unlock(&j->lock);
2680
2681         return bch2_journal_flush_seq(j, seq);
2682 }
2683
2684 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
2685 {
2686         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2687         union journal_res_state *s = &j->reservations;
2688         struct bch_dev *ca;
2689         unsigned iter;
2690         ssize_t ret = 0;
2691
2692         rcu_read_lock();
2693         spin_lock(&j->lock);
2694
2695         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2696                          "active journal entries:\t%zu\n"
2697                          "seq:\t\t\t%llu\n"
2698                          "last_seq:\t\t%llu\n"
2699                          "last_seq_ondisk:\t%llu\n"
2700                          "reservation count:\t%u\n"
2701                          "reservation offset:\t%u\n"
2702                          "current entry u64s:\t%u\n"
2703                          "io in flight:\t\t%i\n"
2704                          "need write:\t\t%i\n"
2705                          "dirty:\t\t\t%i\n"
2706                          "replay done:\t\t%i\n",
2707                          fifo_used(&j->pin),
2708                          (u64) atomic64_read(&j->seq),
2709                          last_seq(j),
2710                          j->last_seq_ondisk,
2711                          journal_state_count(*s, s->idx),
2712                          s->cur_entry_offset,
2713                          j->cur_entry_u64s,
2714                          s->prev_buf_unwritten,
2715                          test_bit(JOURNAL_NEED_WRITE,   &j->flags),
2716                          journal_entry_is_open(j),
2717                          test_bit(JOURNAL_REPLAY_DONE,  &j->flags));
2718
2719         for_each_member_device_rcu(ca, c, iter,
2720                                    &c->rw_devs[BCH_DATA_JOURNAL]) {
2721                 struct journal_device *ja = &ca->journal;
2722
2723                 if (!ja->nr)
2724                         continue;
2725
2726                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2727                                  "dev %u:\n"
2728                                  "\tnr\t\t%u\n"
2729                                  "\tcur_idx\t\t%u (seq %llu)\n"
2730                                  "\tlast_idx\t%u (seq %llu)\n",
2731                                  iter, ja->nr,
2732                                  ja->cur_idx,   ja->bucket_seq[ja->cur_idx],
2733                                  ja->last_idx,  ja->bucket_seq[ja->last_idx]);
2734         }
2735
2736         spin_unlock(&j->lock);
2737         rcu_read_unlock();
2738
2739         return ret;
2740 }
2741
2742 ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
2743 {
2744         struct journal_entry_pin_list *pin_list;
2745         struct journal_entry_pin *pin;
2746         ssize_t ret = 0;
2747         unsigned i;
2748
2749         spin_lock_irq(&j->pin_lock);
2750         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
2751                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2752                                  "%llu: count %u\n",
2753                                  journal_pin_seq(j, pin_list),
2754                                  atomic_read(&pin_list->count));
2755
2756                 list_for_each_entry(pin, &pin_list->list, list)
2757                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2758                                          "\t%p %pf\n",
2759                                          pin, pin->flush);
2760
2761                 if (!list_empty(&pin_list->flushed))
2762                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2763                                          "flushed:\n");
2764
2765                 list_for_each_entry(pin, &pin_list->flushed, list)
2766                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2767                                          "\t%p %pf\n",
2768                                          pin, pin->flush);
2769         }
2770         spin_unlock_irq(&j->pin_lock);
2771
2772         return ret;
2773 }
2774
2775 static bool bch2_journal_writing_to_device(struct bch_dev *ca)
2776 {
2777         struct journal *j = &ca->fs->journal;
2778         bool ret;
2779
2780         spin_lock(&j->lock);
2781         ret = bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key),
2782                                     ca->dev_idx);
2783         spin_unlock(&j->lock);
2784
2785         return ret;
2786 }
2787
2788 /*
2789  * This asumes that ca has already been marked read-only so that
2790  * journal_next_bucket won't pick buckets out of ca any more.
2791  * Hence, if the journal is not currently pointing to ca, there
2792  * will be no new writes to journal entries in ca after all the
2793  * pending ones have been flushed to disk.
2794  *
2795  * If the journal is being written to ca, write a new record, and
2796  * journal_next_bucket will notice that the device is no longer
2797  * writeable and pick a new set of devices to write to.
2798  */
2799
2800 int bch2_journal_move(struct bch_dev *ca)
2801 {
2802         struct journal_device *ja = &ca->journal;
2803         struct journal *j = &ca->fs->journal;
2804         u64 seq_to_flush = 0;
2805         unsigned i;
2806         int ret;
2807
2808         if (bch2_journal_writing_to_device(ca)) {
2809                 /*
2810                  * bch_journal_meta will write a record and we'll wait
2811                  * for the write to complete.
2812                  * Actually writing the journal (journal_write_locked)
2813                  * will call journal_next_bucket which notices that the
2814                  * device is no longer writeable, and picks a new one.
2815                  */
2816                 bch2_journal_meta(j);
2817                 BUG_ON(bch2_journal_writing_to_device(ca));
2818         }
2819
2820         for (i = 0; i < ja->nr; i++)
2821                 seq_to_flush = max(seq_to_flush, ja->bucket_seq[i]);
2822
2823         bch2_journal_flush_pins(j, seq_to_flush);
2824
2825         /*
2826          * Force a meta-data journal entry to be written so that
2827          * we have newer journal entries in devices other than ca,
2828          * and wait for the meta data write to complete.
2829          */
2830         bch2_journal_meta(j);
2831
2832         /*
2833          * Verify that we no longer need any of the journal entries in
2834          * the device
2835          */
2836         spin_lock(&j->lock);
2837         ret = j->last_seq_ondisk > seq_to_flush ? 0 : -EIO;
2838         spin_unlock(&j->lock);
2839
2840         return ret;
2841 }
2842
2843 void bch2_fs_journal_stop(struct journal *j)
2844 {
2845         if (!test_bit(JOURNAL_STARTED, &j->flags))
2846                 return;
2847
2848         /*
2849          * Empty out the journal by first flushing everything pinning existing
2850          * journal entries, then force a brand new empty journal entry to be
2851          * written:
2852          */
2853         bch2_journal_flush_pins(j, U64_MAX);
2854         bch2_journal_flush_async(j, NULL);
2855         bch2_journal_meta(j);
2856
2857         cancel_delayed_work_sync(&j->write_work);
2858         cancel_delayed_work_sync(&j->reclaim_work);
2859 }
2860
2861 void bch2_dev_journal_exit(struct bch_dev *ca)
2862 {
2863         kfree(ca->journal.bio);
2864         kfree(ca->journal.buckets);
2865         kfree(ca->journal.bucket_seq);
2866
2867         ca->journal.bio         = NULL;
2868         ca->journal.buckets     = NULL;
2869         ca->journal.bucket_seq  = NULL;
2870 }
2871
2872 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
2873 {
2874         struct journal_device *ja = &ca->journal;
2875         struct bch_sb_field_journal *journal_buckets =
2876                 bch2_sb_get_journal(sb);
2877         unsigned i;
2878
2879         ja->nr = bch2_nr_journal_buckets(journal_buckets);
2880
2881         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2882         if (!ja->bucket_seq)
2883                 return -ENOMEM;
2884
2885         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
2886                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
2887         if (!ca->journal.bio)
2888                 return -ENOMEM;
2889
2890         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2891         if (!ja->buckets)
2892                 return -ENOMEM;
2893
2894         for (i = 0; i < ja->nr; i++)
2895                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
2896
2897         return 0;
2898 }
2899
2900 void bch2_fs_journal_exit(struct journal *j)
2901 {
2902         kvpfree(j->buf[1].data, j->buf[1].size);
2903         kvpfree(j->buf[0].data, j->buf[0].size);
2904         free_fifo(&j->pin);
2905 }
2906
2907 int bch2_fs_journal_init(struct journal *j)
2908 {
2909         static struct lock_class_key res_key;
2910
2911         spin_lock_init(&j->lock);
2912         spin_lock_init(&j->pin_lock);
2913         init_waitqueue_head(&j->wait);
2914         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
2915         INIT_DELAYED_WORK(&j->reclaim_work, journal_reclaim_work);
2916         mutex_init(&j->blacklist_lock);
2917         INIT_LIST_HEAD(&j->seq_blacklist);
2918         mutex_init(&j->reclaim_lock);
2919
2920         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
2921
2922         j->buf[0].size          = JOURNAL_ENTRY_SIZE_MIN;
2923         j->buf[1].size          = JOURNAL_ENTRY_SIZE_MIN;
2924         j->write_delay_ms       = 100;
2925         j->reclaim_delay_ms     = 100;
2926
2927         bkey_extent_init(&j->key);
2928
2929         atomic64_set(&j->reservations.counter,
2930                 ((union journal_res_state)
2931                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
2932
2933         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
2934             !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
2935             !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL)))
2936                 return -ENOMEM;
2937
2938         j->pin.front = j->pin.back = 1;
2939
2940         return 0;
2941 }