]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
Rename from bcache-tools to bcachefs-tools
[bcachefs-tools-debian] / libbcachefs / journal.c
1 /*
2  * bcachefs journalling code, for btree insertions
3  *
4  * Copyright 2012 Google, Inc.
5  */
6
7 #include "bcachefs.h"
8 #include "alloc.h"
9 #include "bkey_methods.h"
10 #include "buckets.h"
11 #include "btree_gc.h"
12 #include "btree_update.h"
13 #include "btree_io.h"
14 #include "checksum.h"
15 #include "debug.h"
16 #include "error.h"
17 #include "extents.h"
18 #include "io.h"
19 #include "keylist.h"
20 #include "journal.h"
21 #include "super-io.h"
22 #include "vstructs.h"
23
24 #include <trace/events/bcachefs.h>
25
26 static void journal_write(struct closure *);
27 static void journal_reclaim_fast(struct journal *);
28 static void journal_pin_add_entry(struct journal *,
29                                   struct journal_entry_pin_list *,
30                                   struct journal_entry_pin *,
31                                   journal_pin_flush_fn);
32
33 static inline struct journal_buf *journal_cur_buf(struct journal *j)
34 {
35         return j->buf + j->reservations.idx;
36 }
37
38 static inline struct journal_buf *journal_prev_buf(struct journal *j)
39 {
40         return j->buf + !j->reservations.idx;
41 }
42
43 /* Sequence number of oldest dirty journal entry */
44
45 static inline u64 last_seq(struct journal *j)
46 {
47         return atomic64_read(&j->seq) - fifo_used(&j->pin) + 1;
48 }
49
50 static inline u64 journal_pin_seq(struct journal *j,
51                                   struct journal_entry_pin_list *pin_list)
52 {
53         return last_seq(j) + fifo_entry_idx(&j->pin, pin_list);
54 }
55
56 static inline struct jset_entry *__jset_entry_type_next(struct jset *jset,
57                                         struct jset_entry *entry, unsigned type)
58 {
59         while (entry < vstruct_last(jset)) {
60                 if (JOURNAL_ENTRY_TYPE(entry) == type)
61                         return entry;
62
63                 entry = vstruct_next(entry);
64         }
65
66         return NULL;
67 }
68
69 #define for_each_jset_entry_type(entry, jset, type)                     \
70         for (entry = (jset)->start;                                     \
71              (entry = __jset_entry_type_next(jset, entry, type));       \
72              entry = vstruct_next(entry))
73
74 #define for_each_jset_key(k, _n, entry, jset)                           \
75         for_each_jset_entry_type(entry, jset, JOURNAL_ENTRY_BTREE_KEYS) \
76                 vstruct_for_each_safe(entry, k, _n)
77
78 static inline void bch2_journal_add_entry(struct journal_buf *buf,
79                                          const void *data, size_t u64s,
80                                          unsigned type, enum btree_id id,
81                                          unsigned level)
82 {
83         struct jset *jset = buf->data;
84
85         bch2_journal_add_entry_at(buf, data, u64s, type, id, level,
86                                  le32_to_cpu(jset->u64s));
87         le32_add_cpu(&jset->u64s, jset_u64s(u64s));
88 }
89
90 static struct jset_entry *bch2_journal_find_entry(struct jset *j, unsigned type,
91                                                  enum btree_id id)
92 {
93         struct jset_entry *entry;
94
95         for_each_jset_entry_type(entry, j, type)
96                 if (entry->btree_id == id)
97                         return entry;
98
99         return NULL;
100 }
101
102 struct bkey_i *bch2_journal_find_btree_root(struct bch_fs *c, struct jset *j,
103                                            enum btree_id id, unsigned *level)
104 {
105         struct bkey_i *k;
106         struct jset_entry *entry =
107                 bch2_journal_find_entry(j, JOURNAL_ENTRY_BTREE_ROOT, id);
108
109         if (!entry)
110                 return NULL;
111
112         k = entry->start;
113         *level = entry->level;
114         *level = entry->level;
115         return k;
116 }
117
118 static void bch2_journal_add_btree_root(struct journal_buf *buf,
119                                        enum btree_id id, struct bkey_i *k,
120                                        unsigned level)
121 {
122         bch2_journal_add_entry(buf, k, k->k.u64s,
123                               JOURNAL_ENTRY_BTREE_ROOT, id, level);
124 }
125
126 static inline void bch2_journal_add_prios(struct journal *j,
127                                          struct journal_buf *buf)
128 {
129         /*
130          * no prio bucket ptrs yet... XXX should change the allocator so this
131          * can't happen:
132          */
133         if (!buf->nr_prio_buckets)
134                 return;
135
136         bch2_journal_add_entry(buf, j->prio_buckets, buf->nr_prio_buckets,
137                               JOURNAL_ENTRY_PRIO_PTRS, 0, 0);
138 }
139
140 static void journal_seq_blacklist_flush(struct journal *j,
141                                         struct journal_entry_pin *pin)
142 {
143         struct bch_fs *c =
144                 container_of(j, struct bch_fs, journal);
145         struct journal_seq_blacklist *bl =
146                 container_of(pin, struct journal_seq_blacklist, pin);
147         struct blacklisted_node n;
148         struct closure cl;
149         unsigned i;
150         int ret;
151
152         closure_init_stack(&cl);
153
154         for (i = 0;; i++) {
155                 struct btree_iter iter;
156                 struct btree *b;
157
158                 mutex_lock(&j->blacklist_lock);
159                 if (i >= bl->nr_entries) {
160                         mutex_unlock(&j->blacklist_lock);
161                         break;
162                 }
163                 n = bl->entries[i];
164                 mutex_unlock(&j->blacklist_lock);
165
166                 bch2_btree_iter_init(&iter, c, n.btree_id, n.pos);
167                 iter.is_extents = false;
168 redo_peek:
169                 b = bch2_btree_iter_peek_node(&iter);
170
171                 /* The node might have already been rewritten: */
172
173                 if (b->data->keys.seq == n.seq &&
174                     !bkey_cmp(b->key.k.p, n.pos)) {
175                         ret = bch2_btree_node_rewrite(&iter, b, &cl);
176                         if (ret) {
177                                 bch2_btree_iter_unlock(&iter);
178                                 closure_sync(&cl);
179
180                                 if (ret == -EAGAIN ||
181                                     ret == -EINTR)
182                                         goto redo_peek;
183
184                                 /* -EROFS or perhaps -ENOSPC - bail out: */
185                                 /* XXX warn here */
186                                 return;
187                         }
188                 }
189
190                 bch2_btree_iter_unlock(&iter);
191         }
192
193         closure_sync(&cl);
194
195         for (i = 0;; i++) {
196                 struct btree_interior_update *as;
197                 struct pending_btree_node_free *d;
198
199                 mutex_lock(&j->blacklist_lock);
200                 if (i >= bl->nr_entries) {
201                         mutex_unlock(&j->blacklist_lock);
202                         break;
203                 }
204                 n = bl->entries[i];
205                 mutex_unlock(&j->blacklist_lock);
206 redo_wait:
207                 mutex_lock(&c->btree_interior_update_lock);
208
209                 /*
210                  * Is the node on the list of pending interior node updates -
211                  * being freed? If so, wait for that to finish:
212                  */
213                 for_each_pending_btree_node_free(c, as, d)
214                         if (n.seq       == d->seq &&
215                             n.btree_id  == d->btree_id &&
216                             !d->level &&
217                             !bkey_cmp(n.pos, d->key.k.p)) {
218                                 closure_wait(&as->wait, &cl);
219                                 mutex_unlock(&c->btree_interior_update_lock);
220                                 closure_sync(&cl);
221                                 goto redo_wait;
222                         }
223
224                 mutex_unlock(&c->btree_interior_update_lock);
225         }
226
227         mutex_lock(&j->blacklist_lock);
228
229         bch2_journal_pin_drop(j, &bl->pin);
230         list_del(&bl->list);
231         kfree(bl->entries);
232         kfree(bl);
233
234         mutex_unlock(&j->blacklist_lock);
235 }
236
237 static struct journal_seq_blacklist *
238 journal_seq_blacklist_find(struct journal *j, u64 seq)
239 {
240         struct journal_seq_blacklist *bl;
241
242         lockdep_assert_held(&j->blacklist_lock);
243
244         list_for_each_entry(bl, &j->seq_blacklist, list)
245                 if (seq == bl->seq)
246                         return bl;
247
248         return NULL;
249 }
250
251 static struct journal_seq_blacklist *
252 bch2_journal_seq_blacklisted_new(struct journal *j, u64 seq)
253 {
254         struct journal_seq_blacklist *bl;
255
256         lockdep_assert_held(&j->blacklist_lock);
257
258         bl = kzalloc(sizeof(*bl), GFP_KERNEL);
259         if (!bl)
260                 return NULL;
261
262         bl->seq = seq;
263         list_add_tail(&bl->list, &j->seq_blacklist);
264         return bl;
265 }
266
267 /*
268  * Returns true if @seq is newer than the most recent journal entry that got
269  * written, and data corresponding to @seq should be ignored - also marks @seq
270  * as blacklisted so that on future restarts the corresponding data will still
271  * be ignored:
272  */
273 int bch2_journal_seq_should_ignore(struct bch_fs *c, u64 seq, struct btree *b)
274 {
275         struct journal *j = &c->journal;
276         struct journal_seq_blacklist *bl = NULL;
277         struct blacklisted_node *n;
278         u64 journal_seq, i;
279         int ret = 0;
280
281         if (!seq)
282                 return 0;
283
284         journal_seq = atomic64_read(&j->seq);
285
286         /* Interier updates aren't journalled: */
287         BUG_ON(b->level);
288         BUG_ON(seq > journal_seq && test_bit(BCH_FS_INITIAL_GC_DONE, &c->flags));
289
290         if (seq <= journal_seq) {
291                 if (list_empty_careful(&j->seq_blacklist))
292                         return 0;
293
294                 mutex_lock(&j->blacklist_lock);
295                 ret = journal_seq_blacklist_find(j, seq) != NULL;
296                 mutex_unlock(&j->blacklist_lock);
297                 return ret;
298         }
299
300         /*
301          * Decrease this back to j->seq + 2 when we next rev the on disk format:
302          * increasing it temporarily to work around bug in old kernels
303          */
304         bch2_fs_inconsistent_on(seq > journal_seq + 4, c,
305                          "bset journal seq too far in the future: %llu > %llu",
306                          seq, journal_seq);
307
308         bch_verbose(c, "btree node %u:%llu:%llu has future journal sequence number %llu, blacklisting",
309                     b->btree_id, b->key.k.p.inode, b->key.k.p.offset, seq);
310
311         /*
312          * When we start the journal, bch2_journal_start() will skip over @seq:
313          */
314
315         mutex_lock(&j->blacklist_lock);
316
317         for (i = journal_seq + 1; i <= seq; i++) {
318                 bl = journal_seq_blacklist_find(j, i) ?:
319                         bch2_journal_seq_blacklisted_new(j, i);
320
321                 if (!bl) {
322                         ret = -ENOMEM;
323                         goto out;
324                 }
325         }
326
327         for (n = bl->entries; n < bl->entries + bl->nr_entries; n++)
328                 if (b->data->keys.seq   == n->seq &&
329                     b->btree_id         == n->btree_id &&
330                     !bkey_cmp(b->key.k.p, n->pos))
331                         goto found_entry;
332
333         if (!bl->nr_entries ||
334             is_power_of_2(bl->nr_entries)) {
335                 n = krealloc(bl->entries,
336                              max(bl->nr_entries * 2, 8UL) * sizeof(*n),
337                              GFP_KERNEL);
338                 if (!n) {
339                         ret = -ENOMEM;
340                         goto out;
341                 }
342                 bl->entries = n;
343         }
344
345         bl->entries[bl->nr_entries++] = (struct blacklisted_node) {
346                 .seq            = b->data->keys.seq,
347                 .btree_id       = b->btree_id,
348                 .pos            = b->key.k.p,
349         };
350 found_entry:
351         ret = 1;
352 out:
353         mutex_unlock(&j->blacklist_lock);
354         return ret;
355 }
356
357 /*
358  * Journal replay/recovery:
359  *
360  * This code is all driven from bch2_fs_start(); we first read the journal
361  * entries, do some other stuff, then we mark all the keys in the journal
362  * entries (same as garbage collection would), then we replay them - reinserting
363  * them into the cache in precisely the same order as they appear in the
364  * journal.
365  *
366  * We only journal keys that go in leaf nodes, which simplifies things quite a
367  * bit.
368  */
369
370 struct journal_list {
371         struct closure          cl;
372         struct mutex            lock;
373         struct list_head        *head;
374         int                     ret;
375 };
376
377 #define JOURNAL_ENTRY_ADD_OK            0
378 #define JOURNAL_ENTRY_ADD_OUT_OF_RANGE  5
379
380 /*
381  * Given a journal entry we just read, add it to the list of journal entries to
382  * be replayed:
383  */
384 static int journal_entry_add(struct bch_fs *c, struct journal_list *jlist,
385                     struct jset *j)
386 {
387         struct journal_replay *i, *pos;
388         struct list_head *where;
389         size_t bytes = vstruct_bytes(j);
390         __le64 last_seq;
391         int ret;
392
393         mutex_lock(&jlist->lock);
394
395         last_seq = !list_empty(jlist->head)
396                 ? list_last_entry(jlist->head, struct journal_replay,
397                                   list)->j.last_seq
398                 : 0;
399
400         /* Is this entry older than the range we need? */
401         if (le64_to_cpu(j->seq) < le64_to_cpu(last_seq)) {
402                 ret = JOURNAL_ENTRY_ADD_OUT_OF_RANGE;
403                 goto out;
404         }
405
406         /* Drop entries we don't need anymore */
407         list_for_each_entry_safe(i, pos, jlist->head, list) {
408                 if (le64_to_cpu(i->j.seq) >= le64_to_cpu(j->last_seq))
409                         break;
410                 list_del(&i->list);
411                 kfree(i);
412         }
413
414         list_for_each_entry_reverse(i, jlist->head, list) {
415                 /* Duplicate? */
416                 if (le64_to_cpu(j->seq) == le64_to_cpu(i->j.seq)) {
417                         fsck_err_on(bytes != vstruct_bytes(&i->j) ||
418                                     memcmp(j, &i->j, bytes), c,
419                                     "found duplicate but non identical journal entries (seq %llu)",
420                                     le64_to_cpu(j->seq));
421
422                         ret = JOURNAL_ENTRY_ADD_OK;
423                         goto out;
424                 }
425
426                 if (le64_to_cpu(j->seq) > le64_to_cpu(i->j.seq)) {
427                         where = &i->list;
428                         goto add;
429                 }
430         }
431
432         where = jlist->head;
433 add:
434         i = kvmalloc(offsetof(struct journal_replay, j) + bytes, GFP_KERNEL);
435         if (!i) {
436                 ret = -ENOMEM;
437                 goto out;
438         }
439
440         memcpy(&i->j, j, bytes);
441         list_add(&i->list, where);
442         ret = JOURNAL_ENTRY_ADD_OK;
443 out:
444 fsck_err:
445         mutex_unlock(&jlist->lock);
446         return ret;
447 }
448
449 static struct nonce journal_nonce(const struct jset *jset)
450 {
451         return (struct nonce) {{
452                 [0] = 0,
453                 [1] = ((__le32 *) &jset->seq)[0],
454                 [2] = ((__le32 *) &jset->seq)[1],
455                 [3] = BCH_NONCE_JOURNAL,
456         }};
457 }
458
459 static void journal_entry_null_range(void *start, void *end)
460 {
461         struct jset_entry *entry;
462
463         for (entry = start; entry != end; entry = vstruct_next(entry)) {
464                 entry->u64s     = 0;
465                 entry->btree_id = 0;
466                 entry->level    = 0;
467                 entry->flags    = 0;
468                 SET_JOURNAL_ENTRY_TYPE(entry, 0);
469         }
470 }
471
472 static int journal_validate_key(struct bch_fs *c, struct jset *j,
473                                 struct jset_entry *entry,
474                                 struct bkey_i *k, enum bkey_type key_type,
475                                 const char *type)
476 {
477         void *next = vstruct_next(entry);
478         const char *invalid;
479         char buf[160];
480         int ret = 0;
481
482         if (mustfix_fsck_err_on(!k->k.u64s, c,
483                         "invalid %s in journal: k->u64s 0", type)) {
484                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
485                 journal_entry_null_range(vstruct_next(entry), next);
486                 return 0;
487         }
488
489         if (mustfix_fsck_err_on((void *) bkey_next(k) >
490                                 (void *) vstruct_next(entry), c,
491                         "invalid %s in journal: extends past end of journal entry",
492                         type)) {
493                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
494                 journal_entry_null_range(vstruct_next(entry), next);
495                 return 0;
496         }
497
498         if (mustfix_fsck_err_on(k->k.format != KEY_FORMAT_CURRENT, c,
499                         "invalid %s in journal: bad format %u",
500                         type, k->k.format)) {
501                 le16_add_cpu(&entry->u64s, -k->k.u64s);
502                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
503                 journal_entry_null_range(vstruct_next(entry), next);
504                 return 0;
505         }
506
507         if (JSET_BIG_ENDIAN(j) != CPU_BIG_ENDIAN)
508                 bch2_bkey_swab(key_type, NULL, bkey_to_packed(k));
509
510         invalid = bch2_bkey_invalid(c, key_type, bkey_i_to_s_c(k));
511         if (invalid) {
512                 bch2_bkey_val_to_text(c, key_type, buf, sizeof(buf),
513                                      bkey_i_to_s_c(k));
514                 mustfix_fsck_err(c, "invalid %s in journal: %s", type, buf);
515
516                 le16_add_cpu(&entry->u64s, -k->k.u64s);
517                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
518                 journal_entry_null_range(vstruct_next(entry), next);
519                 return 0;
520         }
521 fsck_err:
522         return ret;
523 }
524
525 #define JOURNAL_ENTRY_REREAD    5
526 #define JOURNAL_ENTRY_NONE      6
527 #define JOURNAL_ENTRY_BAD       7
528
529 static int journal_entry_validate(struct bch_fs *c,
530                                   struct jset *j, u64 sector,
531                                   unsigned bucket_sectors_left,
532                                   unsigned sectors_read)
533 {
534         struct jset_entry *entry;
535         size_t bytes = vstruct_bytes(j);
536         struct bch_csum csum;
537         int ret = 0;
538
539         if (le64_to_cpu(j->magic) != jset_magic(c))
540                 return JOURNAL_ENTRY_NONE;
541
542         if (le32_to_cpu(j->version) != BCACHE_JSET_VERSION) {
543                 bch_err(c, "unknown journal entry version %u",
544                         le32_to_cpu(j->version));
545                 return BCH_FSCK_UNKNOWN_VERSION;
546         }
547
548         if (mustfix_fsck_err_on(bytes > bucket_sectors_left << 9, c,
549                         "journal entry too big (%zu bytes), sector %lluu",
550                         bytes, sector)) {
551                 /* XXX: note we might have missing journal entries */
552                 return JOURNAL_ENTRY_BAD;
553         }
554
555         if (bytes > sectors_read << 9)
556                 return JOURNAL_ENTRY_REREAD;
557
558         if (fsck_err_on(!bch2_checksum_type_valid(c, JSET_CSUM_TYPE(j)), c,
559                         "journal entry with unknown csum type %llu sector %lluu",
560                         JSET_CSUM_TYPE(j), sector))
561                 return JOURNAL_ENTRY_BAD;
562
563         csum = csum_vstruct(c, JSET_CSUM_TYPE(j), journal_nonce(j), j);
564         if (mustfix_fsck_err_on(bch2_crc_cmp(csum, j->csum), c,
565                         "journal checksum bad, sector %llu", sector)) {
566                 /* XXX: retry IO, when we start retrying checksum errors */
567                 /* XXX: note we might have missing journal entries */
568                 return JOURNAL_ENTRY_BAD;
569         }
570
571         bch2_encrypt(c, JSET_CSUM_TYPE(j), journal_nonce(j),
572                     j->encrypted_start,
573                     vstruct_end(j) - (void *) j->encrypted_start);
574
575         if (mustfix_fsck_err_on(le64_to_cpu(j->last_seq) > le64_to_cpu(j->seq), c,
576                         "invalid journal entry: last_seq > seq"))
577                 j->last_seq = j->seq;
578
579         vstruct_for_each(j, entry) {
580                 struct bkey_i *k;
581
582                 if (mustfix_fsck_err_on(vstruct_next(entry) >
583                                         vstruct_last(j), c,
584                                 "journal entry extents past end of jset")) {
585                         j->u64s = cpu_to_le64((u64 *) entry - j->_data);
586                         break;
587                 }
588
589                 switch (JOURNAL_ENTRY_TYPE(entry)) {
590                 case JOURNAL_ENTRY_BTREE_KEYS:
591                         vstruct_for_each(entry, k) {
592                                 ret = journal_validate_key(c, j, entry, k,
593                                                 bkey_type(entry->level,
594                                                           entry->btree_id),
595                                                 "key");
596                                 if (ret)
597                                         goto fsck_err;
598                         }
599                         break;
600
601                 case JOURNAL_ENTRY_BTREE_ROOT:
602                         k = entry->start;
603
604                         if (mustfix_fsck_err_on(!entry->u64s ||
605                                         le16_to_cpu(entry->u64s) != k->k.u64s, c,
606                                         "invalid btree root journal entry: wrong number of keys")) {
607                                 journal_entry_null_range(entry,
608                                                 vstruct_next(entry));
609                                 continue;
610                         }
611
612                         ret = journal_validate_key(c, j, entry, k,
613                                                    BKEY_TYPE_BTREE, "btree root");
614                         if (ret)
615                                 goto fsck_err;
616                         break;
617
618                 case JOURNAL_ENTRY_PRIO_PTRS:
619                         break;
620
621                 case JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED:
622                         if (mustfix_fsck_err_on(le16_to_cpu(entry->u64s) != 1, c,
623                                 "invalid journal seq blacklist entry: bad size")) {
624                                 journal_entry_null_range(entry,
625                                                 vstruct_next(entry));
626                         }
627
628                         break;
629                 default:
630                         mustfix_fsck_err(c, "invalid journal entry type %llu",
631                                  JOURNAL_ENTRY_TYPE(entry));
632                         journal_entry_null_range(entry, vstruct_next(entry));
633                         break;
634                 }
635         }
636
637 fsck_err:
638         return ret;
639 }
640
641 struct journal_read_buf {
642         void            *data;
643         size_t          size;
644 };
645
646 static int journal_read_buf_realloc(struct journal_read_buf *b,
647                                     size_t new_size)
648 {
649         void *n;
650
651         new_size = roundup_pow_of_two(new_size);
652         n = (void *) __get_free_pages(GFP_KERNEL, get_order(new_size));
653         if (!n)
654                 return -ENOMEM;
655
656         free_pages((unsigned long) b->data, get_order(b->size));
657         b->data = n;
658         b->size = new_size;
659         return 0;
660 }
661
662 static int journal_read_bucket(struct bch_dev *ca,
663                                struct journal_read_buf *buf,
664                                struct journal_list *jlist,
665                                unsigned bucket, u64 *seq, bool *entries_found)
666 {
667         struct bch_fs *c = ca->fs;
668         struct journal_device *ja = &ca->journal;
669         struct bio *bio = ja->bio;
670         struct jset *j = NULL;
671         unsigned sectors, sectors_read = 0;
672         u64 offset = bucket_to_sector(ca, ja->buckets[bucket]),
673             end = offset + ca->mi.bucket_size;
674         bool saw_bad = false;
675         int ret = 0;
676
677         pr_debug("reading %u", bucket);
678
679         while (offset < end) {
680                 if (!sectors_read) {
681 reread:                 sectors_read = min_t(unsigned,
682                                 end - offset, buf->size >> 9);
683
684                         bio_reset(bio);
685                         bio->bi_bdev            = ca->disk_sb.bdev;
686                         bio->bi_iter.bi_sector  = offset;
687                         bio->bi_iter.bi_size    = sectors_read << 9;
688                         bio_set_op_attrs(bio, REQ_OP_READ, 0);
689                         bch2_bio_map(bio, buf->data);
690
691                         ret = submit_bio_wait(bio);
692
693                         if (bch2_dev_fatal_io_err_on(ret, ca,
694                                                   "journal read from sector %llu",
695                                                   offset) ||
696                             bch2_meta_read_fault("journal"))
697                                 return -EIO;
698
699                         j = buf->data;
700                 }
701
702                 ret = journal_entry_validate(c, j, offset,
703                                         end - offset, sectors_read);
704                 switch (ret) {
705                 case BCH_FSCK_OK:
706                         break;
707                 case JOURNAL_ENTRY_REREAD:
708                         if (vstruct_bytes(j) > buf->size) {
709                                 ret = journal_read_buf_realloc(buf,
710                                                         vstruct_bytes(j));
711                                 if (ret)
712                                         return ret;
713                         }
714                         goto reread;
715                 case JOURNAL_ENTRY_NONE:
716                         if (!saw_bad)
717                                 return 0;
718                         sectors = c->sb.block_size;
719                         goto next_block;
720                 case JOURNAL_ENTRY_BAD:
721                         saw_bad = true;
722                         sectors = c->sb.block_size;
723                         goto next_block;
724                 default:
725                         return ret;
726                 }
727
728                 /*
729                  * This happens sometimes if we don't have discards on -
730                  * when we've partially overwritten a bucket with new
731                  * journal entries. We don't need the rest of the
732                  * bucket:
733                  */
734                 if (le64_to_cpu(j->seq) < ja->bucket_seq[bucket])
735                         return 0;
736
737                 ja->bucket_seq[bucket] = le64_to_cpu(j->seq);
738
739                 ret = journal_entry_add(c, jlist, j);
740                 switch (ret) {
741                 case JOURNAL_ENTRY_ADD_OK:
742                         *entries_found = true;
743                         break;
744                 case JOURNAL_ENTRY_ADD_OUT_OF_RANGE:
745                         break;
746                 default:
747                         return ret;
748                 }
749
750                 if (le64_to_cpu(j->seq) > *seq)
751                         *seq = le64_to_cpu(j->seq);
752
753                 sectors = vstruct_sectors(j, c->block_bits);
754 next_block:
755                 pr_debug("next");
756                 offset          += sectors;
757                 sectors_read    -= sectors;
758                 j = ((void *) j) + (sectors << 9);
759         }
760
761         return 0;
762 }
763
764 static void bch2_journal_read_device(struct closure *cl)
765 {
766 #define read_bucket(b)                                                  \
767         ({                                                              \
768                 bool entries_found = false;                             \
769                 ret = journal_read_bucket(ca, &buf, jlist, b, &seq,     \
770                                           &entries_found);              \
771                 if (ret)                                                \
772                         goto err;                                       \
773                 __set_bit(b, bitmap);                                   \
774                 entries_found;                                          \
775          })
776
777         struct journal_device *ja =
778                 container_of(cl, struct journal_device, read);
779         struct bch_dev *ca = container_of(ja, struct bch_dev, journal);
780         struct journal_list *jlist =
781                 container_of(cl->parent, struct journal_list, cl);
782         struct request_queue *q = bdev_get_queue(ca->disk_sb.bdev);
783         struct journal_read_buf buf = { NULL, 0 };
784
785         DECLARE_BITMAP(bitmap, ja->nr);
786         unsigned i, l, r;
787         u64 seq = 0;
788         int ret;
789
790         if (!ja->nr)
791                 goto out;
792
793         bitmap_zero(bitmap, ja->nr);
794         ret = journal_read_buf_realloc(&buf, PAGE_SIZE);
795         if (ret)
796                 goto err;
797
798         pr_debug("%u journal buckets", ja->nr);
799
800         /*
801          * If the device supports discard but not secure discard, we can't do
802          * the fancy fibonacci hash/binary search because the live journal
803          * entries might not form a contiguous range:
804          */
805         for (i = 0; i < ja->nr; i++)
806                 read_bucket(i);
807         goto search_done;
808
809         if (!blk_queue_nonrot(q))
810                 goto linear_scan;
811
812         /*
813          * Read journal buckets ordered by golden ratio hash to quickly
814          * find a sequence of buckets with valid journal entries
815          */
816         for (i = 0; i < ja->nr; i++) {
817                 l = (i * 2654435769U) % ja->nr;
818
819                 if (test_bit(l, bitmap))
820                         break;
821
822                 if (read_bucket(l))
823                         goto bsearch;
824         }
825
826         /*
827          * If that fails, check all the buckets we haven't checked
828          * already
829          */
830         pr_debug("falling back to linear search");
831 linear_scan:
832         for (l = find_first_zero_bit(bitmap, ja->nr);
833              l < ja->nr;
834              l = find_next_zero_bit(bitmap, ja->nr, l + 1))
835                 if (read_bucket(l))
836                         goto bsearch;
837
838         /* no journal entries on this device? */
839         if (l == ja->nr)
840                 goto out;
841 bsearch:
842         /* Binary search */
843         r = find_next_bit(bitmap, ja->nr, l + 1);
844         pr_debug("starting binary search, l %u r %u", l, r);
845
846         while (l + 1 < r) {
847                 unsigned m = (l + r) >> 1;
848                 u64 cur_seq = seq;
849
850                 read_bucket(m);
851
852                 if (cur_seq != seq)
853                         l = m;
854                 else
855                         r = m;
856         }
857
858 search_done:
859         /*
860          * Find the journal bucket with the highest sequence number:
861          *
862          * If there's duplicate journal entries in multiple buckets (which
863          * definitely isn't supposed to happen, but...) - make sure to start
864          * cur_idx at the last of those buckets, so we don't deadlock trying to
865          * allocate
866          */
867         seq = 0;
868
869         for (i = 0; i < ja->nr; i++)
870                 if (ja->bucket_seq[i] >= seq &&
871                     ja->bucket_seq[i] != ja->bucket_seq[(i + 1) % ja->nr]) {
872                         /*
873                          * When journal_next_bucket() goes to allocate for
874                          * the first time, it'll use the bucket after
875                          * ja->cur_idx
876                          */
877                         ja->cur_idx = i;
878                         seq = ja->bucket_seq[i];
879                 }
880
881         /*
882          * Set last_idx to indicate the entire journal is full and needs to be
883          * reclaimed - journal reclaim will immediately reclaim whatever isn't
884          * pinned when it first runs:
885          */
886         ja->last_idx = (ja->cur_idx + 1) % ja->nr;
887
888         /*
889          * Read buckets in reverse order until we stop finding more journal
890          * entries:
891          */
892         for (i = (ja->cur_idx + ja->nr - 1) % ja->nr;
893              i != ja->cur_idx;
894              i = (i + ja->nr - 1) % ja->nr)
895                 if (!test_bit(i, bitmap) &&
896                     !read_bucket(i))
897                         break;
898 out:
899         free_pages((unsigned long) buf.data, get_order(buf.size));
900         percpu_ref_put(&ca->io_ref);
901         closure_return(cl);
902 err:
903         mutex_lock(&jlist->lock);
904         jlist->ret = ret;
905         mutex_unlock(&jlist->lock);
906         goto out;
907 #undef read_bucket
908 }
909
910 void bch2_journal_entries_free(struct list_head *list)
911 {
912
913         while (!list_empty(list)) {
914                 struct journal_replay *i =
915                         list_first_entry(list, struct journal_replay, list);
916                 list_del(&i->list);
917                 kvfree(i);
918         }
919 }
920
921 static int journal_seq_blacklist_read(struct journal *j,
922                                       struct journal_replay *i,
923                                       struct journal_entry_pin_list *p)
924 {
925         struct bch_fs *c = container_of(j, struct bch_fs, journal);
926         struct jset_entry *entry;
927         struct journal_seq_blacklist *bl;
928         u64 seq;
929
930         for_each_jset_entry_type(entry, &i->j,
931                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED) {
932                 seq = le64_to_cpu(entry->_data[0]);
933
934                 bch_verbose(c, "blacklisting existing journal seq %llu", seq);
935
936                 bl = bch2_journal_seq_blacklisted_new(j, seq);
937                 if (!bl)
938                         return -ENOMEM;
939
940                 journal_pin_add_entry(j, p, &bl->pin,
941                                   journal_seq_blacklist_flush);
942                 bl->written = true;
943         }
944
945         return 0;
946 }
947
948 static inline bool journal_has_keys(struct list_head *list)
949 {
950         struct journal_replay *i;
951         struct jset_entry *entry;
952         struct bkey_i *k, *_n;
953
954         list_for_each_entry(i, list, list)
955                 for_each_jset_key(k, _n, entry, &i->j)
956                         return true;
957
958         return false;
959 }
960
961 int bch2_journal_read(struct bch_fs *c, struct list_head *list)
962 {
963         struct jset_entry *prio_ptrs;
964         struct journal_list jlist;
965         struct journal_replay *i;
966         struct jset *j;
967         struct journal_entry_pin_list *p;
968         struct bch_dev *ca;
969         u64 cur_seq, end_seq;
970         unsigned iter;
971         int ret = 0;
972
973         closure_init_stack(&jlist.cl);
974         mutex_init(&jlist.lock);
975         jlist.head = list;
976         jlist.ret = 0;
977
978         for_each_readable_member(ca, c, iter) {
979                 percpu_ref_get(&ca->io_ref);
980                 closure_call(&ca->journal.read,
981                              bch2_journal_read_device,
982                              system_unbound_wq,
983                              &jlist.cl);
984         }
985
986         closure_sync(&jlist.cl);
987
988         if (jlist.ret)
989                 return jlist.ret;
990
991         if (list_empty(list)){
992                 bch_err(c, "no journal entries found");
993                 return BCH_FSCK_REPAIR_IMPOSSIBLE;
994         }
995
996         fsck_err_on(c->sb.clean && journal_has_keys(list), c,
997                     "filesystem marked clean but journal has keys to replay");
998
999         j = &list_entry(list->prev, struct journal_replay, list)->j;
1000
1001         unfixable_fsck_err_on(le64_to_cpu(j->seq) -
1002                         le64_to_cpu(j->last_seq) + 1 >
1003                         c->journal.pin.size, c,
1004                         "too many journal entries open for refcount fifo");
1005
1006         c->journal.pin.back = le64_to_cpu(j->seq) -
1007                 le64_to_cpu(j->last_seq) + 1;
1008
1009         atomic64_set(&c->journal.seq, le64_to_cpu(j->seq));
1010         c->journal.last_seq_ondisk = le64_to_cpu(j->last_seq);
1011
1012         BUG_ON(last_seq(&c->journal) != le64_to_cpu(j->last_seq));
1013
1014         i = list_first_entry(list, struct journal_replay, list);
1015
1016         mutex_lock(&c->journal.blacklist_lock);
1017
1018         fifo_for_each_entry_ptr(p, &c->journal.pin, iter) {
1019                 u64 seq = journal_pin_seq(&c->journal, p);
1020
1021                 INIT_LIST_HEAD(&p->list);
1022
1023                 if (i && le64_to_cpu(i->j.seq) == seq) {
1024                         atomic_set(&p->count, 1);
1025
1026                         if (journal_seq_blacklist_read(&c->journal, i, p)) {
1027                                 mutex_unlock(&c->journal.blacklist_lock);
1028                                 return -ENOMEM;
1029                         }
1030
1031                         i = list_is_last(&i->list, list)
1032                                 ? NULL
1033                                 : list_next_entry(i, list);
1034                 } else {
1035                         atomic_set(&p->count, 0);
1036                 }
1037         }
1038
1039         mutex_unlock(&c->journal.blacklist_lock);
1040
1041         cur_seq = last_seq(&c->journal);
1042         end_seq = le64_to_cpu(list_last_entry(list,
1043                                 struct journal_replay, list)->j.seq);
1044
1045         list_for_each_entry(i, list, list) {
1046                 bool blacklisted;
1047
1048                 mutex_lock(&c->journal.blacklist_lock);
1049                 while (cur_seq < le64_to_cpu(i->j.seq) &&
1050                        journal_seq_blacklist_find(&c->journal, cur_seq))
1051                         cur_seq++;
1052
1053                 blacklisted = journal_seq_blacklist_find(&c->journal,
1054                                                          le64_to_cpu(i->j.seq));
1055                 mutex_unlock(&c->journal.blacklist_lock);
1056
1057                 fsck_err_on(blacklisted, c,
1058                             "found blacklisted journal entry %llu",
1059                             le64_to_cpu(i->j.seq));
1060
1061                 fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c,
1062                         "journal entries %llu-%llu missing! (replaying %llu-%llu)",
1063                         cur_seq, le64_to_cpu(i->j.seq) - 1,
1064                         last_seq(&c->journal), end_seq);
1065
1066                 cur_seq = le64_to_cpu(i->j.seq) + 1;
1067         }
1068
1069         prio_ptrs = bch2_journal_find_entry(j, JOURNAL_ENTRY_PRIO_PTRS, 0);
1070         if (prio_ptrs) {
1071                 memcpy_u64s(c->journal.prio_buckets,
1072                             prio_ptrs->_data,
1073                             le16_to_cpu(prio_ptrs->u64s));
1074                 c->journal.nr_prio_buckets = le16_to_cpu(prio_ptrs->u64s);
1075         }
1076 fsck_err:
1077         return ret;
1078 }
1079
1080 void bch2_journal_mark(struct bch_fs *c, struct list_head *list)
1081 {
1082         struct bkey_i *k, *n;
1083         struct jset_entry *j;
1084         struct journal_replay *r;
1085
1086         list_for_each_entry(r, list, list)
1087                 for_each_jset_key(k, n, j, &r->j) {
1088                         enum bkey_type type = bkey_type(j->level, j->btree_id);
1089                         struct bkey_s_c k_s_c = bkey_i_to_s_c(k);
1090
1091                         if (btree_type_has_ptrs(type))
1092                                 bch2_btree_mark_key_initial(c, type, k_s_c);
1093                 }
1094 }
1095
1096 static bool journal_entry_is_open(struct journal *j)
1097 {
1098         return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
1099 }
1100
1101 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
1102 {
1103         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1104
1105         if (!need_write_just_set &&
1106             test_bit(JOURNAL_NEED_WRITE, &j->flags))
1107                 __bch2_time_stats_update(j->delay_time,
1108                                         j->need_write_time);
1109 #if 0
1110         closure_call(&j->io, journal_write, NULL, &c->cl);
1111 #else
1112         /* Shut sparse up: */
1113         closure_init(&j->io, &c->cl);
1114         set_closure_fn(&j->io, journal_write, NULL);
1115         journal_write(&j->io);
1116 #endif
1117 }
1118
1119 static void __bch2_journal_next_entry(struct journal *j)
1120 {
1121         struct journal_entry_pin_list pin_list, *p;
1122         struct journal_buf *buf;
1123
1124         /*
1125          * The fifo_push() needs to happen at the same time as j->seq is
1126          * incremented for last_seq() to be calculated correctly
1127          */
1128         atomic64_inc(&j->seq);
1129         BUG_ON(!fifo_push(&j->pin, pin_list));
1130         p = &fifo_peek_back(&j->pin);
1131
1132         INIT_LIST_HEAD(&p->list);
1133         atomic_set(&p->count, 1);
1134
1135         if (test_bit(JOURNAL_REPLAY_DONE, &j->flags)) {
1136                 smp_wmb();
1137                 j->cur_pin_list = p;
1138         }
1139
1140         buf = journal_cur_buf(j);
1141         memset(buf->has_inode, 0, sizeof(buf->has_inode));
1142
1143         memset(buf->data, 0, sizeof(*buf->data));
1144         buf->data->seq  = cpu_to_le64(atomic64_read(&j->seq));
1145         buf->data->u64s = 0;
1146
1147         BUG_ON(journal_pin_seq(j, p) != atomic64_read(&j->seq));
1148 }
1149
1150 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
1151 {
1152         unsigned ret = BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1153
1154         if (buf->nr_prio_buckets)
1155                 ret += JSET_KEYS_U64s + buf->nr_prio_buckets;
1156
1157         return ret;
1158 }
1159
1160 static enum {
1161         JOURNAL_ENTRY_ERROR,
1162         JOURNAL_ENTRY_INUSE,
1163         JOURNAL_ENTRY_CLOSED,
1164         JOURNAL_UNLOCKED,
1165 } journal_buf_switch(struct journal *j, bool need_write_just_set)
1166 {
1167         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1168         struct journal_buf *buf;
1169         union journal_res_state old, new;
1170         u64 v = atomic64_read(&j->reservations.counter);
1171
1172         do {
1173                 old.v = new.v = v;
1174                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
1175                         return JOURNAL_ENTRY_CLOSED;
1176
1177                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1178                         return JOURNAL_ENTRY_ERROR;
1179
1180                 if (new.prev_buf_unwritten)
1181                         return JOURNAL_ENTRY_INUSE;
1182
1183                 /*
1184                  * avoid race between setting buf->data->u64s and
1185                  * journal_res_put starting write:
1186                  */
1187                 journal_state_inc(&new);
1188
1189                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
1190                 new.idx++;
1191                 new.prev_buf_unwritten = 1;
1192
1193                 BUG_ON(journal_state_count(new, new.idx));
1194         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1195                                        old.v, new.v)) != old.v);
1196
1197         journal_reclaim_fast(j);
1198
1199         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
1200
1201         buf = &j->buf[old.idx];
1202         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
1203         buf->data->last_seq     = cpu_to_le64(last_seq(j));
1204
1205         j->prev_buf_sectors =
1206                 vstruct_blocks_plus(buf->data, c->block_bits,
1207                                     journal_entry_u64s_reserve(buf)) *
1208                 c->sb.block_size;
1209
1210         BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
1211
1212         atomic_dec_bug(&fifo_peek_back(&j->pin).count);
1213         __bch2_journal_next_entry(j);
1214
1215         cancel_delayed_work(&j->write_work);
1216         spin_unlock(&j->lock);
1217
1218         if (c->bucket_journal_seq > 1 << 14) {
1219                 c->bucket_journal_seq = 0;
1220                 bch2_bucket_seq_cleanup(c);
1221         }
1222
1223         /* ugh - might be called from __journal_res_get() under wait_event() */
1224         __set_current_state(TASK_RUNNING);
1225         bch2_journal_buf_put(j, old.idx, need_write_just_set);
1226
1227         return JOURNAL_UNLOCKED;
1228 }
1229
1230 void bch2_journal_halt(struct journal *j)
1231 {
1232         union journal_res_state old, new;
1233         u64 v = atomic64_read(&j->reservations.counter);
1234
1235         do {
1236                 old.v = new.v = v;
1237                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1238                         return;
1239
1240                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
1241         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1242                                        old.v, new.v)) != old.v);
1243
1244         wake_up(&j->wait);
1245         closure_wake_up(&journal_cur_buf(j)->wait);
1246         closure_wake_up(&journal_prev_buf(j)->wait);
1247 }
1248
1249 static unsigned journal_dev_buckets_available(struct journal *j,
1250                                               struct bch_dev *ca)
1251 {
1252         struct journal_device *ja = &ca->journal;
1253         unsigned next = (ja->cur_idx + 1) % ja->nr;
1254         unsigned available = (ja->last_idx + ja->nr - next) % ja->nr;
1255
1256         /*
1257          * Hack to avoid a deadlock during journal replay:
1258          * journal replay might require setting a new btree
1259          * root, which requires writing another journal entry -
1260          * thus, if the journal is full (and this happens when
1261          * replaying the first journal bucket's entries) we're
1262          * screwed.
1263          *
1264          * So don't let the journal fill up unless we're in
1265          * replay:
1266          */
1267         if (test_bit(JOURNAL_REPLAY_DONE, &j->flags))
1268                 available = max((int) available - 2, 0);
1269
1270         /*
1271          * Don't use the last bucket unless writing the new last_seq
1272          * will make another bucket available:
1273          */
1274         if (ja->bucket_seq[ja->last_idx] >= last_seq(j))
1275                 available = max((int) available - 1, 0);
1276
1277         return available;
1278 }
1279
1280 /* returns number of sectors available for next journal entry: */
1281 static int journal_entry_sectors(struct journal *j)
1282 {
1283         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1284         struct bch_dev *ca;
1285         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
1286         unsigned sectors_available = j->entry_size_max >> 9;
1287         unsigned i, nr_online = 0, nr_devs = 0;
1288
1289         lockdep_assert_held(&j->lock);
1290
1291         spin_lock(&j->devs.lock);
1292         group_for_each_dev(ca, &j->devs, i) {
1293                 unsigned buckets_required = 0;
1294
1295                 sectors_available = min_t(unsigned, sectors_available,
1296                                           ca->mi.bucket_size);
1297
1298                 /*
1299                  * Note that we don't allocate the space for a journal entry
1300                  * until we write it out - thus, if we haven't started the write
1301                  * for the previous entry we have to make sure we have space for
1302                  * it too:
1303                  */
1304                 if (bch2_extent_has_device(e.c, ca->dev_idx)) {
1305                         if (j->prev_buf_sectors > ca->journal.sectors_free)
1306                                 buckets_required++;
1307
1308                         if (j->prev_buf_sectors + sectors_available >
1309                             ca->journal.sectors_free)
1310                                 buckets_required++;
1311                 } else {
1312                         if (j->prev_buf_sectors + sectors_available >
1313                             ca->mi.bucket_size)
1314                                 buckets_required++;
1315
1316                         buckets_required++;
1317                 }
1318
1319                 if (journal_dev_buckets_available(j, ca) >= buckets_required)
1320                         nr_devs++;
1321                 nr_online++;
1322         }
1323         spin_unlock(&j->devs.lock);
1324
1325         if (nr_online < c->opts.metadata_replicas_required)
1326                 return -EROFS;
1327
1328         if (nr_devs < min_t(unsigned, nr_online, c->opts.metadata_replicas))
1329                 return 0;
1330
1331         return sectors_available;
1332 }
1333
1334 /*
1335  * should _only_ called from journal_res_get() - when we actually want a
1336  * journal reservation - journal entry is open means journal is dirty:
1337  */
1338 static int journal_entry_open(struct journal *j)
1339 {
1340         struct journal_buf *buf = journal_cur_buf(j);
1341         ssize_t u64s;
1342         int ret = 0, sectors;
1343
1344         lockdep_assert_held(&j->lock);
1345         BUG_ON(journal_entry_is_open(j));
1346
1347         if (!fifo_free(&j->pin))
1348                 return 0;
1349
1350         sectors = journal_entry_sectors(j);
1351         if (sectors <= 0)
1352                 return sectors;
1353
1354         j->cur_buf_sectors      = sectors;
1355         buf->nr_prio_buckets    = j->nr_prio_buckets;
1356
1357         u64s = (sectors << 9) / sizeof(u64);
1358
1359         /* Subtract the journal header */
1360         u64s -= sizeof(struct jset) / sizeof(u64);
1361         /*
1362          * Btree roots, prio pointers don't get added until right before we do
1363          * the write:
1364          */
1365         u64s -= journal_entry_u64s_reserve(buf);
1366         u64s  = max_t(ssize_t, 0L, u64s);
1367
1368         BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
1369
1370         if (u64s > le32_to_cpu(buf->data->u64s)) {
1371                 union journal_res_state old, new;
1372                 u64 v = atomic64_read(&j->reservations.counter);
1373
1374                 /*
1375                  * Must be set before marking the journal entry as open:
1376                  */
1377                 j->cur_entry_u64s = u64s;
1378
1379                 do {
1380                         old.v = new.v = v;
1381
1382                         if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1383                                 return false;
1384
1385                         /* Handle any already added entries */
1386                         new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
1387                 } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1388                                                old.v, new.v)) != old.v);
1389                 ret = 1;
1390
1391                 wake_up(&j->wait);
1392
1393                 if (j->res_get_blocked_start) {
1394                         __bch2_time_stats_update(j->blocked_time,
1395                                                 j->res_get_blocked_start);
1396                         j->res_get_blocked_start = 0;
1397                 }
1398
1399                 mod_delayed_work(system_freezable_wq,
1400                                  &j->write_work,
1401                                  msecs_to_jiffies(j->write_delay_ms));
1402         }
1403
1404         return ret;
1405 }
1406
1407 void bch2_journal_start(struct bch_fs *c)
1408 {
1409         struct journal *j = &c->journal;
1410         struct journal_seq_blacklist *bl;
1411         u64 new_seq = 0;
1412
1413         list_for_each_entry(bl, &j->seq_blacklist, list)
1414                 new_seq = max(new_seq, bl->seq);
1415
1416         spin_lock(&j->lock);
1417
1418         set_bit(JOURNAL_STARTED, &j->flags);
1419
1420         while (atomic64_read(&j->seq) < new_seq) {
1421                 struct journal_entry_pin_list pin_list, *p;
1422
1423                 BUG_ON(!fifo_push(&j->pin, pin_list));
1424                 p = &fifo_peek_back(&j->pin);
1425
1426                 INIT_LIST_HEAD(&p->list);
1427                 atomic_set(&p->count, 0);
1428                 atomic64_inc(&j->seq);
1429         }
1430
1431         /*
1432          * journal_buf_switch() only inits the next journal entry when it
1433          * closes an open journal entry - the very first journal entry gets
1434          * initialized here:
1435          */
1436         __bch2_journal_next_entry(j);
1437
1438         /*
1439          * Adding entries to the next journal entry before allocating space on
1440          * disk for the next journal entry - this is ok, because these entries
1441          * only have to go down with the next journal entry we write:
1442          */
1443         list_for_each_entry(bl, &j->seq_blacklist, list)
1444                 if (!bl->written) {
1445                         bch2_journal_add_entry(journal_cur_buf(j), &bl->seq, 1,
1446                                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED,
1447                                         0, 0);
1448
1449                         journal_pin_add_entry(j,
1450                                               &fifo_peek_back(&j->pin),
1451                                               &bl->pin,
1452                                               journal_seq_blacklist_flush);
1453                         bl->written = true;
1454                 }
1455
1456         spin_unlock(&j->lock);
1457
1458         queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
1459 }
1460
1461 int bch2_journal_replay(struct bch_fs *c, struct list_head *list)
1462 {
1463         int ret = 0, keys = 0, entries = 0;
1464         struct journal *j = &c->journal;
1465         struct bkey_i *k, *_n;
1466         struct jset_entry *entry;
1467         struct journal_replay *i, *n;
1468
1469         list_for_each_entry_safe(i, n, list, list) {
1470                 j->cur_pin_list =
1471                         &j->pin.data[((j->pin.back - 1 -
1472                                        (atomic64_read(&j->seq) -
1473                                         le64_to_cpu(i->j.seq))) &
1474                                       j->pin.mask)];
1475
1476                 for_each_jset_key(k, _n, entry, &i->j) {
1477                         struct disk_reservation disk_res;
1478
1479                         /*
1480                          * We might cause compressed extents to be split, so we
1481                          * need to pass in a disk_reservation:
1482                          */
1483                         BUG_ON(bch2_disk_reservation_get(c, &disk_res, 0, 0));
1484
1485                         ret = bch2_btree_insert(c, entry->btree_id, k,
1486                                                &disk_res, NULL, NULL,
1487                                                BTREE_INSERT_NOFAIL|
1488                                                BTREE_INSERT_JOURNAL_REPLAY);
1489                         bch2_disk_reservation_put(c, &disk_res);
1490
1491                         if (ret)
1492                                 goto err;
1493
1494                         cond_resched();
1495                         keys++;
1496                 }
1497
1498                 if (atomic_dec_and_test(&j->cur_pin_list->count))
1499                         wake_up(&j->wait);
1500
1501                 entries++;
1502         }
1503
1504         if (keys) {
1505                 bch2_btree_flush(c);
1506
1507                 /*
1508                  * Write a new journal entry _before_ we start journalling new data -
1509                  * otherwise, we could end up with btree node bsets with journal seqs
1510                  * arbitrarily far in the future vs. the most recently written journal
1511                  * entry on disk, if we crash before writing the next journal entry:
1512                  */
1513                 ret = bch2_journal_meta(&c->journal);
1514                 if (ret)
1515                         goto err;
1516         }
1517
1518         bch_info(c, "journal replay done, %i keys in %i entries, seq %llu",
1519                  keys, entries, (u64) atomic64_read(&j->seq));
1520
1521         bch2_journal_set_replay_done(&c->journal);
1522 err:
1523         if (ret)
1524                 bch_err(c, "journal replay error: %d", ret);
1525
1526         bch2_journal_entries_free(list);
1527
1528         return ret;
1529 }
1530
1531 #if 0
1532 /*
1533  * Allocate more journal space at runtime - not currently making use if it, but
1534  * the code works:
1535  */
1536 static int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
1537                                       unsigned nr)
1538 {
1539         struct journal *j = &c->journal;
1540         struct journal_device *ja = &ca->journal;
1541         struct bch_sb_field_journal *journal_buckets;
1542         struct disk_reservation disk_res = { 0, 0 };
1543         struct closure cl;
1544         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
1545         int ret = 0;
1546
1547         closure_init_stack(&cl);
1548
1549         /* don't handle reducing nr of buckets yet: */
1550         if (nr <= ja->nr)
1551                 return 0;
1552
1553         /*
1554          * note: journal buckets aren't really counted as _sectors_ used yet, so
1555          * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
1556          * when space used goes up without a reservation - but we do need the
1557          * reservation to ensure we'll actually be able to allocate:
1558          */
1559
1560         if (bch2_disk_reservation_get(c, &disk_res,
1561                         (nr - ja->nr) << ca->bucket_bits, 0))
1562                 return -ENOSPC;
1563
1564         mutex_lock(&c->sb_lock);
1565
1566         ret = -ENOMEM;
1567         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1568         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1569         if (!new_buckets || !new_bucket_seq)
1570                 goto err;
1571
1572         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
1573                                 nr + sizeof(*journal_buckets) / sizeof(u64));
1574         if (!journal_buckets)
1575                 goto err;
1576
1577         spin_lock(&j->lock);
1578         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
1579         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
1580         swap(new_buckets,       ja->buckets);
1581         swap(new_bucket_seq,    ja->bucket_seq);
1582
1583         while (ja->nr < nr) {
1584                 /* must happen under journal lock, to avoid racing with gc: */
1585                 u64 b = bch2_bucket_alloc(ca, RESERVE_NONE);
1586                 if (!b) {
1587                         if (!closure_wait(&c->freelist_wait, &cl)) {
1588                                 spin_unlock(&j->lock);
1589                                 closure_sync(&cl);
1590                                 spin_lock(&j->lock);
1591                         }
1592                         continue;
1593                 }
1594
1595                 bch2_mark_metadata_bucket(ca, &ca->buckets[b],
1596                                          BUCKET_JOURNAL, false);
1597                 bch2_mark_alloc_bucket(ca, &ca->buckets[b], false);
1598
1599                 memmove(ja->buckets + ja->last_idx + 1,
1600                         ja->buckets + ja->last_idx,
1601                         (ja->nr - ja->last_idx) * sizeof(u64));
1602                 memmove(ja->bucket_seq + ja->last_idx + 1,
1603                         ja->bucket_seq + ja->last_idx,
1604                         (ja->nr - ja->last_idx) * sizeof(u64));
1605                 memmove(journal_buckets->buckets + ja->last_idx + 1,
1606                         journal_buckets->buckets + ja->last_idx,
1607                         (ja->nr - ja->last_idx) * sizeof(u64));
1608
1609                 ja->buckets[ja->last_idx] = b;
1610                 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(b);
1611
1612                 if (ja->last_idx < ja->nr) {
1613                         if (ja->cur_idx >= ja->last_idx)
1614                                 ja->cur_idx++;
1615                         ja->last_idx++;
1616                 }
1617                 ja->nr++;
1618
1619         }
1620         spin_unlock(&j->lock);
1621
1622         BUG_ON(bch2_validate_journal_layout(ca->disk_sb.sb, ca->mi));
1623
1624         bch2_write_super(c);
1625
1626         ret = 0;
1627 err:
1628         mutex_unlock(&c->sb_lock);
1629
1630         kfree(new_bucket_seq);
1631         kfree(new_buckets);
1632         bch2_disk_reservation_put(c, &disk_res);
1633
1634         return ret;
1635 }
1636 #endif
1637
1638 int bch2_dev_journal_alloc(struct bch_dev *ca)
1639 {
1640         struct journal_device *ja = &ca->journal;
1641         struct bch_sb_field_journal *journal_buckets;
1642         unsigned i, nr;
1643         u64 b, *p;
1644
1645         if (dynamic_fault("bcachefs:add:journal_alloc"))
1646                 return -ENOMEM;
1647
1648         /*
1649          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
1650          * is smaller:
1651          */
1652         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
1653                      BCH_JOURNAL_BUCKETS_MIN,
1654                      min(1 << 10,
1655                          (1 << 20) / ca->mi.bucket_size));
1656
1657         p = krealloc(ja->bucket_seq, nr * sizeof(u64),
1658                      GFP_KERNEL|__GFP_ZERO);
1659         if (!p)
1660                 return -ENOMEM;
1661
1662         ja->bucket_seq = p;
1663
1664         p = krealloc(ja->buckets, nr * sizeof(u64),
1665                      GFP_KERNEL|__GFP_ZERO);
1666         if (!p)
1667                 return -ENOMEM;
1668
1669         ja->buckets = p;
1670
1671         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
1672                                 nr + sizeof(*journal_buckets) / sizeof(u64));
1673         if (!journal_buckets)
1674                 return -ENOMEM;
1675
1676         for (i = 0, b = ca->mi.first_bucket;
1677              i < nr && b < ca->mi.nbuckets; b++) {
1678                 if (!is_available_bucket(ca->buckets[b].mark))
1679                         continue;
1680
1681                 bch2_mark_metadata_bucket(ca, &ca->buckets[b],
1682                                          BUCKET_JOURNAL, true);
1683                 ja->buckets[i] = b;
1684                 journal_buckets->buckets[i] = cpu_to_le64(b);
1685                 i++;
1686         }
1687
1688         if (i < nr)
1689                 return -ENOSPC;
1690
1691         BUG_ON(bch2_validate_journal_layout(ca->disk_sb.sb, ca->mi));
1692
1693         ja->nr = nr;
1694
1695         return 0;
1696 }
1697
1698 /* Journalling */
1699
1700 /**
1701  * journal_reclaim_fast - do the fast part of journal reclaim
1702  *
1703  * Called from IO submission context, does not block. Cleans up after btree
1704  * write completions by advancing the journal pin and each cache's last_idx,
1705  * kicking off discards and background reclaim as necessary.
1706  */
1707 static void journal_reclaim_fast(struct journal *j)
1708 {
1709         struct journal_entry_pin_list temp;
1710         bool popped = false;
1711
1712         lockdep_assert_held(&j->lock);
1713
1714         /*
1715          * Unpin journal entries whose reference counts reached zero, meaning
1716          * all btree nodes got written out
1717          */
1718         while (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1719                 BUG_ON(!list_empty(&fifo_peek_front(&j->pin).list));
1720                 BUG_ON(!fifo_pop(&j->pin, temp));
1721                 popped = true;
1722         }
1723
1724         if (popped)
1725                 wake_up(&j->wait);
1726 }
1727
1728 /*
1729  * Journal entry pinning - machinery for holding a reference on a given journal
1730  * entry, marking it as dirty:
1731  */
1732
1733 static inline void __journal_pin_add(struct journal *j,
1734                                      struct journal_entry_pin_list *pin_list,
1735                                      struct journal_entry_pin *pin,
1736                                      journal_pin_flush_fn flush_fn)
1737 {
1738         BUG_ON(journal_pin_active(pin));
1739
1740         atomic_inc(&pin_list->count);
1741         pin->pin_list   = pin_list;
1742         pin->flush      = flush_fn;
1743
1744         if (flush_fn)
1745                 list_add(&pin->list, &pin_list->list);
1746         else
1747                 INIT_LIST_HEAD(&pin->list);
1748 }
1749
1750 static void journal_pin_add_entry(struct journal *j,
1751                                   struct journal_entry_pin_list *pin_list,
1752                                   struct journal_entry_pin *pin,
1753                                   journal_pin_flush_fn flush_fn)
1754 {
1755         spin_lock_irq(&j->pin_lock);
1756         __journal_pin_add(j, pin_list, pin, flush_fn);
1757         spin_unlock_irq(&j->pin_lock);
1758 }
1759
1760 void bch2_journal_pin_add(struct journal *j,
1761                          struct journal_entry_pin *pin,
1762                          journal_pin_flush_fn flush_fn)
1763 {
1764         spin_lock_irq(&j->pin_lock);
1765         __journal_pin_add(j, j->cur_pin_list, pin, flush_fn);
1766         spin_unlock_irq(&j->pin_lock);
1767 }
1768
1769 static inline bool __journal_pin_drop(struct journal *j,
1770                                       struct journal_entry_pin *pin)
1771 {
1772         struct journal_entry_pin_list *pin_list = pin->pin_list;
1773
1774         pin->pin_list = NULL;
1775
1776         /* journal_reclaim_work() might have already taken us off the list */
1777         if (!list_empty_careful(&pin->list))
1778                 list_del_init(&pin->list);
1779
1780         return atomic_dec_and_test(&pin_list->count);
1781 }
1782
1783 void bch2_journal_pin_drop(struct journal *j,
1784                           struct journal_entry_pin *pin)
1785 {
1786         unsigned long flags;
1787         bool wakeup;
1788
1789         if (!journal_pin_active(pin))
1790                 return;
1791
1792         spin_lock_irqsave(&j->pin_lock, flags);
1793         wakeup = __journal_pin_drop(j, pin);
1794         spin_unlock_irqrestore(&j->pin_lock, flags);
1795
1796         /*
1797          * Unpinning a journal entry make make journal_next_bucket() succeed, if
1798          * writing a new last_seq will now make another bucket available:
1799          *
1800          * Nested irqsave is expensive, don't do the wakeup with lock held:
1801          */
1802         if (wakeup)
1803                 wake_up(&j->wait);
1804 }
1805
1806 void bch2_journal_pin_add_if_older(struct journal *j,
1807                                   struct journal_entry_pin *src_pin,
1808                                   struct journal_entry_pin *pin,
1809                                   journal_pin_flush_fn flush_fn)
1810 {
1811         spin_lock_irq(&j->pin_lock);
1812
1813         if (journal_pin_active(src_pin) &&
1814             (!journal_pin_active(pin) ||
1815              fifo_entry_idx(&j->pin, src_pin->pin_list) <
1816              fifo_entry_idx(&j->pin, pin->pin_list))) {
1817                 if (journal_pin_active(pin))
1818                         __journal_pin_drop(j, pin);
1819                 __journal_pin_add(j, src_pin->pin_list, pin, flush_fn);
1820         }
1821
1822         spin_unlock_irq(&j->pin_lock);
1823 }
1824
1825 static struct journal_entry_pin *
1826 journal_get_next_pin(struct journal *j, u64 seq_to_flush)
1827 {
1828         struct journal_entry_pin_list *pin_list;
1829         struct journal_entry_pin *ret = NULL;
1830         unsigned iter;
1831
1832         /* so we don't iterate over empty fifo entries below: */
1833         if (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1834                 spin_lock(&j->lock);
1835                 journal_reclaim_fast(j);
1836                 spin_unlock(&j->lock);
1837         }
1838
1839         spin_lock_irq(&j->pin_lock);
1840         fifo_for_each_entry_ptr(pin_list, &j->pin, iter) {
1841                 if (journal_pin_seq(j, pin_list) > seq_to_flush)
1842                         break;
1843
1844                 ret = list_first_entry_or_null(&pin_list->list,
1845                                 struct journal_entry_pin, list);
1846                 if (ret) {
1847                         /* must be list_del_init(), see bch2_journal_pin_drop() */
1848                         list_del_init(&ret->list);
1849                         break;
1850                 }
1851         }
1852         spin_unlock_irq(&j->pin_lock);
1853
1854         return ret;
1855 }
1856
1857 static bool journal_has_pins(struct journal *j)
1858 {
1859         bool ret;
1860
1861         spin_lock(&j->lock);
1862         journal_reclaim_fast(j);
1863         ret = fifo_used(&j->pin) > 1 ||
1864                 atomic_read(&fifo_peek_front(&j->pin).count) > 1;
1865         spin_unlock(&j->lock);
1866
1867         return ret;
1868 }
1869
1870 void bch2_journal_flush_pins(struct journal *j)
1871 {
1872         struct journal_entry_pin *pin;
1873
1874         while ((pin = journal_get_next_pin(j, U64_MAX)))
1875                 pin->flush(j, pin);
1876
1877         wait_event(j->wait, !journal_has_pins(j) || bch2_journal_error(j));
1878 }
1879
1880 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
1881 {
1882         bool ret;
1883
1884         spin_lock(&j->lock);
1885         ret = ja->nr &&
1886                 (ja->last_idx != ja->cur_idx &&
1887                  ja->bucket_seq[ja->last_idx] < j->last_seq_ondisk);
1888         spin_unlock(&j->lock);
1889
1890         return ret;
1891 }
1892
1893 /**
1894  * journal_reclaim_work - free up journal buckets
1895  *
1896  * Background journal reclaim writes out btree nodes. It should be run
1897  * early enough so that we never completely run out of journal buckets.
1898  *
1899  * High watermarks for triggering background reclaim:
1900  * - FIFO has fewer than 512 entries left
1901  * - fewer than 25% journal buckets free
1902  *
1903  * Background reclaim runs until low watermarks are reached:
1904  * - FIFO has more than 1024 entries left
1905  * - more than 50% journal buckets free
1906  *
1907  * As long as a reclaim can complete in the time it takes to fill up
1908  * 512 journal entries or 25% of all journal buckets, then
1909  * journal_next_bucket() should not stall.
1910  */
1911 static void journal_reclaim_work(struct work_struct *work)
1912 {
1913         struct bch_fs *c = container_of(to_delayed_work(work),
1914                                 struct bch_fs, journal.reclaim_work);
1915         struct journal *j = &c->journal;
1916         struct bch_dev *ca;
1917         struct journal_entry_pin *pin;
1918         u64 seq_to_flush = 0;
1919         unsigned iter, bucket_to_flush;
1920         unsigned long next_flush;
1921         bool reclaim_lock_held = false, need_flush;
1922
1923         /*
1924          * Advance last_idx to point to the oldest journal entry containing
1925          * btree node updates that have not yet been written out
1926          */
1927         for_each_rw_member(ca, c, iter) {
1928                 struct journal_device *ja = &ca->journal;
1929
1930                 if (!ja->nr)
1931                         continue;
1932
1933                 while (should_discard_bucket(j, ja)) {
1934                         if (!reclaim_lock_held) {
1935                                 /*
1936                                  * ugh:
1937                                  * might be called from __journal_res_get()
1938                                  * under wait_event() - have to go back to
1939                                  * TASK_RUNNING before doing something that
1940                                  * would block, but only if we're doing work:
1941                                  */
1942                                 __set_current_state(TASK_RUNNING);
1943
1944                                 mutex_lock(&j->reclaim_lock);
1945                                 reclaim_lock_held = true;
1946                                 /* recheck under reclaim_lock: */
1947                                 continue;
1948                         }
1949
1950                         if (ca->mi.discard &&
1951                             blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
1952                                 blkdev_issue_discard(ca->disk_sb.bdev,
1953                                         bucket_to_sector(ca,
1954                                                 ja->buckets[ja->last_idx]),
1955                                         ca->mi.bucket_size, GFP_NOIO, 0);
1956
1957                         spin_lock(&j->lock);
1958                         ja->last_idx = (ja->last_idx + 1) % ja->nr;
1959                         spin_unlock(&j->lock);
1960
1961                         wake_up(&j->wait);
1962                 }
1963
1964                 /*
1965                  * Write out enough btree nodes to free up 50% journal
1966                  * buckets
1967                  */
1968                 spin_lock(&j->lock);
1969                 bucket_to_flush = (ja->cur_idx + (ja->nr >> 1)) % ja->nr;
1970                 seq_to_flush = max_t(u64, seq_to_flush,
1971                                      ja->bucket_seq[bucket_to_flush]);
1972                 spin_unlock(&j->lock);
1973         }
1974
1975         if (reclaim_lock_held)
1976                 mutex_unlock(&j->reclaim_lock);
1977
1978         /* Also flush if the pin fifo is more than half full */
1979         seq_to_flush = max_t(s64, seq_to_flush,
1980                              (s64) atomic64_read(&j->seq) -
1981                              (j->pin.size >> 1));
1982
1983         /*
1984          * If it's been longer than j->reclaim_delay_ms since we last flushed,
1985          * make sure to flush at least one journal pin:
1986          */
1987         next_flush = j->last_flushed + msecs_to_jiffies(j->reclaim_delay_ms);
1988         need_flush = time_after(jiffies, next_flush);
1989
1990         while ((pin = journal_get_next_pin(j, need_flush
1991                                            ? U64_MAX
1992                                            : seq_to_flush))) {
1993                 __set_current_state(TASK_RUNNING);
1994                 pin->flush(j, pin);
1995                 need_flush = false;
1996
1997                 j->last_flushed = jiffies;
1998         }
1999
2000         if (!test_bit(BCH_FS_RO, &c->flags))
2001                 queue_delayed_work(system_freezable_wq, &j->reclaim_work,
2002                                    msecs_to_jiffies(j->reclaim_delay_ms));
2003 }
2004
2005 /**
2006  * journal_next_bucket - move on to the next journal bucket if possible
2007  */
2008 static int journal_write_alloc(struct journal *j, unsigned sectors)
2009 {
2010         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2011         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
2012         struct bch_extent_ptr *ptr;
2013         struct journal_device *ja;
2014         struct bch_dev *ca;
2015         bool swapped;
2016         unsigned i, replicas, replicas_want =
2017                 READ_ONCE(c->opts.metadata_replicas);
2018
2019         spin_lock(&j->lock);
2020
2021         /*
2022          * Drop any pointers to devices that have been removed, are no longer
2023          * empty, or filled up their current journal bucket:
2024          *
2025          * Note that a device may have had a small amount of free space (perhaps
2026          * one sector) that wasn't enough for the smallest possible journal
2027          * entry - that's why we drop pointers to devices <= current free space,
2028          * i.e. whichever device was limiting the current journal entry size.
2029          */
2030         extent_for_each_ptr_backwards(e, ptr) {
2031                 ca = c->devs[ptr->dev];
2032
2033                 if (ca->mi.state != BCH_MEMBER_STATE_RW ||
2034                     ca->journal.sectors_free <= sectors)
2035                         __bch2_extent_drop_ptr(e, ptr);
2036                 else
2037                         ca->journal.sectors_free -= sectors;
2038         }
2039
2040         replicas = bch2_extent_nr_ptrs(e.c);
2041
2042         spin_lock(&j->devs.lock);
2043
2044         /* Sort by tier: */
2045         do {
2046                 swapped = false;
2047
2048                 for (i = 0; i + 1 < j->devs.nr; i++)
2049                         if (j->devs.d[i + 0].dev->mi.tier >
2050                             j->devs.d[i + 1].dev->mi.tier) {
2051                                 swap(j->devs.d[i], j->devs.d[i + 1]);
2052                                 swapped = true;
2053                         }
2054         } while (swapped);
2055
2056         /*
2057          * Pick devices for next journal write:
2058          * XXX: sort devices by free journal space?
2059          */
2060         group_for_each_dev(ca, &j->devs, i) {
2061                 ja = &ca->journal;
2062
2063                 if (replicas >= replicas_want)
2064                         break;
2065
2066                 /*
2067                  * Check that we can use this device, and aren't already using
2068                  * it:
2069                  */
2070                 if (bch2_extent_has_device(e.c, ca->dev_idx) ||
2071                     !journal_dev_buckets_available(j, ca) ||
2072                     sectors > ca->mi.bucket_size)
2073                         continue;
2074
2075                 ja->sectors_free = ca->mi.bucket_size - sectors;
2076                 ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
2077                 ja->bucket_seq[ja->cur_idx] = atomic64_read(&j->seq);
2078
2079                 extent_ptr_append(bkey_i_to_extent(&j->key),
2080                         (struct bch_extent_ptr) {
2081                                   .offset = bucket_to_sector(ca,
2082                                         ja->buckets[ja->cur_idx]),
2083                                   .dev = ca->dev_idx,
2084                 });
2085                 replicas++;
2086         }
2087         spin_unlock(&j->devs.lock);
2088
2089         j->prev_buf_sectors = 0;
2090         spin_unlock(&j->lock);
2091
2092         if (replicas < c->opts.metadata_replicas_required)
2093                 return -EROFS;
2094
2095         BUG_ON(!replicas);
2096
2097         return 0;
2098 }
2099
2100 static void journal_write_compact(struct jset *jset)
2101 {
2102         struct jset_entry *i, *next, *prev = NULL;
2103
2104         /*
2105          * Simple compaction, dropping empty jset_entries (from journal
2106          * reservations that weren't fully used) and merging jset_entries that
2107          * can be.
2108          *
2109          * If we wanted to be really fancy here, we could sort all the keys in
2110          * the jset and drop keys that were overwritten - probably not worth it:
2111          */
2112         vstruct_for_each_safe(jset, i, next) {
2113                 unsigned u64s = le16_to_cpu(i->u64s);
2114
2115                 /* Empty entry: */
2116                 if (!u64s)
2117                         continue;
2118
2119                 /* Can we merge with previous entry? */
2120                 if (prev &&
2121                     i->btree_id == prev->btree_id &&
2122                     i->level    == prev->level &&
2123                     JOURNAL_ENTRY_TYPE(i) == JOURNAL_ENTRY_TYPE(prev) &&
2124                     JOURNAL_ENTRY_TYPE(i) == JOURNAL_ENTRY_BTREE_KEYS &&
2125                     le16_to_cpu(prev->u64s) + u64s <= U16_MAX) {
2126                         memmove_u64s_down(vstruct_next(prev),
2127                                           i->_data,
2128                                           u64s);
2129                         le16_add_cpu(&prev->u64s, u64s);
2130                         continue;
2131                 }
2132
2133                 /* Couldn't merge, move i into new position (after prev): */
2134                 prev = prev ? vstruct_next(prev) : jset->start;
2135                 if (i != prev)
2136                         memmove_u64s_down(prev, i, jset_u64s(u64s));
2137         }
2138
2139         prev = prev ? vstruct_next(prev) : jset->start;
2140         jset->u64s = cpu_to_le32((u64 *) prev - jset->_data);
2141 }
2142
2143 static void journal_write_endio(struct bio *bio)
2144 {
2145         struct bch_dev *ca = bio->bi_private;
2146         struct journal *j = &ca->fs->journal;
2147
2148         if (bch2_dev_fatal_io_err_on(bio->bi_error, ca, "journal write") ||
2149             bch2_meta_write_fault("journal"))
2150                 bch2_journal_halt(j);
2151
2152         closure_put(&j->io);
2153         percpu_ref_put(&ca->io_ref);
2154 }
2155
2156 static void journal_write_done(struct closure *cl)
2157 {
2158         struct journal *j = container_of(cl, struct journal, io);
2159         struct journal_buf *w = journal_prev_buf(j);
2160
2161         j->last_seq_ondisk = le64_to_cpu(w->data->last_seq);
2162
2163         __bch2_time_stats_update(j->write_time, j->write_start_time);
2164
2165         BUG_ON(!j->reservations.prev_buf_unwritten);
2166         atomic64_sub(((union journal_res_state) { .prev_buf_unwritten = 1 }).v,
2167                      &j->reservations.counter);
2168
2169         /*
2170          * XXX: this is racy, we could technically end up doing the wake up
2171          * after the journal_buf struct has been reused for the next write
2172          * (because we're clearing JOURNAL_IO_IN_FLIGHT) and wake up things that
2173          * are waiting on the _next_ write, not this one.
2174          *
2175          * The wake up can't come before, because journal_flush_seq_async() is
2176          * looking at JOURNAL_IO_IN_FLIGHT when it has to wait on a journal
2177          * write that was already in flight.
2178          *
2179          * The right fix is to use a lock here, but using j.lock here means it
2180          * has to be a spin_lock_irqsave() lock which then requires propagating
2181          * the irq()ness to other locks and it's all kinds of nastiness.
2182          */
2183
2184         closure_wake_up(&w->wait);
2185         wake_up(&j->wait);
2186
2187         /*
2188          * Updating last_seq_ondisk may let journal_reclaim_work() discard more
2189          * buckets:
2190          */
2191         mod_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
2192 }
2193
2194 static void journal_write(struct closure *cl)
2195 {
2196         struct journal *j = container_of(cl, struct journal, io);
2197         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2198         struct bch_dev *ca;
2199         struct journal_buf *w = journal_prev_buf(j);
2200         struct jset *jset = w->data;
2201         struct bio *bio;
2202         struct bch_extent_ptr *ptr;
2203         unsigned i, sectors, bytes;
2204
2205         j->write_start_time = local_clock();
2206
2207         bch2_journal_add_prios(j, w);
2208
2209         mutex_lock(&c->btree_root_lock);
2210         for (i = 0; i < BTREE_ID_NR; i++) {
2211                 struct btree_root *r = &c->btree_roots[i];
2212
2213                 if (r->alive)
2214                         bch2_journal_add_btree_root(w, i, &r->key, r->level);
2215         }
2216         mutex_unlock(&c->btree_root_lock);
2217
2218         journal_write_compact(jset);
2219
2220         jset->read_clock        = cpu_to_le16(c->prio_clock[READ].hand);
2221         jset->write_clock       = cpu_to_le16(c->prio_clock[WRITE].hand);
2222         jset->magic             = cpu_to_le64(jset_magic(c));
2223         jset->version           = cpu_to_le32(BCACHE_JSET_VERSION);
2224
2225         SET_JSET_BIG_ENDIAN(jset, CPU_BIG_ENDIAN);
2226         SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
2227
2228         bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
2229                     jset->encrypted_start,
2230                     vstruct_end(jset) - (void *) jset->encrypted_start);
2231
2232         jset->csum = csum_vstruct(c, JSET_CSUM_TYPE(jset),
2233                                   journal_nonce(jset), jset);
2234
2235         sectors = vstruct_sectors(jset, c->block_bits);
2236         BUG_ON(sectors > j->prev_buf_sectors);
2237
2238         bytes = vstruct_bytes(w->data);
2239         memset((void *) w->data + bytes, 0, (sectors << 9) - bytes);
2240
2241         if (journal_write_alloc(j, sectors)) {
2242                 bch2_journal_halt(j);
2243                 bch_err(c, "Unable to allocate journal write");
2244                 bch2_fatal_error(c);
2245                 closure_return_with_destructor(cl, journal_write_done);
2246         }
2247
2248         bch2_check_mark_super(c, &j->key, true);
2249
2250         /*
2251          * XXX: we really should just disable the entire journal in nochanges
2252          * mode
2253          */
2254         if (c->opts.nochanges)
2255                 goto no_io;
2256
2257         extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr) {
2258                 ca = c->devs[ptr->dev];
2259                 if (!percpu_ref_tryget(&ca->io_ref)) {
2260                         /* XXX: fix this */
2261                         bch_err(c, "missing device for journal write\n");
2262                         continue;
2263                 }
2264
2265                 atomic64_add(sectors, &ca->meta_sectors_written);
2266
2267                 bio = ca->journal.bio;
2268                 bio_reset(bio);
2269                 bio->bi_iter.bi_sector  = ptr->offset;
2270                 bio->bi_bdev            = ca->disk_sb.bdev;
2271                 bio->bi_iter.bi_size    = sectors << 9;
2272                 bio->bi_end_io          = journal_write_endio;
2273                 bio->bi_private         = ca;
2274                 bio_set_op_attrs(bio, REQ_OP_WRITE,
2275                                  REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
2276                 bch2_bio_map(bio, jset);
2277
2278                 trace_journal_write(bio);
2279                 closure_bio_submit(bio, cl);
2280
2281                 ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(w->data->seq);
2282         }
2283
2284         for_each_rw_member(ca, c, i)
2285                 if (journal_flushes_device(ca) &&
2286                     !bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key), i)) {
2287                         percpu_ref_get(&ca->io_ref);
2288
2289                         bio = ca->journal.bio;
2290                         bio_reset(bio);
2291                         bio->bi_bdev            = ca->disk_sb.bdev;
2292                         bio->bi_end_io          = journal_write_endio;
2293                         bio->bi_private         = ca;
2294                         bio_set_op_attrs(bio, REQ_OP_WRITE, WRITE_FLUSH);
2295                         closure_bio_submit(bio, cl);
2296                 }
2297
2298 no_io:
2299         extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr)
2300                 ptr->offset += sectors;
2301
2302         closure_return_with_destructor(cl, journal_write_done);
2303 }
2304
2305 static void journal_write_work(struct work_struct *work)
2306 {
2307         struct journal *j = container_of(to_delayed_work(work),
2308                                          struct journal, write_work);
2309         spin_lock(&j->lock);
2310         set_bit(JOURNAL_NEED_WRITE, &j->flags);
2311
2312         if (journal_buf_switch(j, false) != JOURNAL_UNLOCKED)
2313                 spin_unlock(&j->lock);
2314 }
2315
2316 /*
2317  * Given an inode number, if that inode number has data in the journal that
2318  * hasn't yet been flushed, return the journal sequence number that needs to be
2319  * flushed:
2320  */
2321 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
2322 {
2323         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
2324         u64 seq = 0;
2325
2326         if (!test_bit(h, j->buf[0].has_inode) &&
2327             !test_bit(h, j->buf[1].has_inode))
2328                 return 0;
2329
2330         spin_lock(&j->lock);
2331         if (test_bit(h, journal_cur_buf(j)->has_inode))
2332                 seq = atomic64_read(&j->seq);
2333         else if (test_bit(h, journal_prev_buf(j)->has_inode))
2334                 seq = atomic64_read(&j->seq) - 1;
2335         spin_unlock(&j->lock);
2336
2337         return seq;
2338 }
2339
2340 static int __journal_res_get(struct journal *j, struct journal_res *res,
2341                               unsigned u64s_min, unsigned u64s_max)
2342 {
2343         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2344         int ret;
2345 retry:
2346         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2347         if (ret)
2348                 return ret;
2349
2350         spin_lock(&j->lock);
2351         /*
2352          * Recheck after taking the lock, so we don't race with another thread
2353          * that just did journal_entry_open() and call journal_entry_close()
2354          * unnecessarily
2355          */
2356         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2357         if (ret) {
2358                 spin_unlock(&j->lock);
2359                 return 1;
2360         }
2361
2362         /*
2363          * Ok, no more room in the current journal entry - try to start a new
2364          * one:
2365          */
2366         switch (journal_buf_switch(j, false)) {
2367         case JOURNAL_ENTRY_ERROR:
2368                 spin_unlock(&j->lock);
2369                 return -EIO;
2370         case JOURNAL_ENTRY_INUSE:
2371                 /* haven't finished writing out the previous one: */
2372                 spin_unlock(&j->lock);
2373                 trace_journal_entry_full(c);
2374                 goto blocked;
2375         case JOURNAL_ENTRY_CLOSED:
2376                 break;
2377         case JOURNAL_UNLOCKED:
2378                 goto retry;
2379         }
2380
2381         /* We now have a new, closed journal buf - see if we can open it: */
2382         ret = journal_entry_open(j);
2383         spin_unlock(&j->lock);
2384
2385         if (ret < 0)
2386                 return ret;
2387         if (ret)
2388                 goto retry;
2389
2390         /* Journal's full, we have to wait */
2391
2392         /*
2393          * Direct reclaim - can't rely on reclaim from work item
2394          * due to freezing..
2395          */
2396         journal_reclaim_work(&j->reclaim_work.work);
2397
2398         trace_journal_full(c);
2399 blocked:
2400         if (!j->res_get_blocked_start)
2401                 j->res_get_blocked_start = local_clock() ?: 1;
2402         return 0;
2403 }
2404
2405 /*
2406  * Essentially the entry function to the journaling code. When bcachefs is doing
2407  * a btree insert, it calls this function to get the current journal write.
2408  * Journal write is the structure used set up journal writes. The calling
2409  * function will then add its keys to the structure, queuing them for the next
2410  * write.
2411  *
2412  * To ensure forward progress, the current task must not be holding any
2413  * btree node write locks.
2414  */
2415 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
2416                                  unsigned u64s_min, unsigned u64s_max)
2417 {
2418         int ret;
2419
2420         wait_event(j->wait,
2421                    (ret = __journal_res_get(j, res, u64s_min,
2422                                             u64s_max)));
2423         return ret < 0 ? ret : 0;
2424 }
2425
2426 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
2427 {
2428         spin_lock(&j->lock);
2429
2430         BUG_ON(seq > atomic64_read(&j->seq));
2431
2432         if (bch2_journal_error(j)) {
2433                 spin_unlock(&j->lock);
2434                 return;
2435         }
2436
2437         if (seq == atomic64_read(&j->seq)) {
2438                 if (!closure_wait(&journal_cur_buf(j)->wait, parent))
2439                         BUG();
2440         } else if (seq + 1 == atomic64_read(&j->seq) &&
2441                    j->reservations.prev_buf_unwritten) {
2442                 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
2443                         BUG();
2444
2445                 smp_mb();
2446
2447                 /* check if raced with write completion (or failure) */
2448                 if (!j->reservations.prev_buf_unwritten ||
2449                     bch2_journal_error(j))
2450                         closure_wake_up(&journal_prev_buf(j)->wait);
2451         }
2452
2453         spin_unlock(&j->lock);
2454 }
2455
2456 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
2457 {
2458         spin_lock(&j->lock);
2459
2460         BUG_ON(seq > atomic64_read(&j->seq));
2461
2462         if (bch2_journal_error(j)) {
2463                 spin_unlock(&j->lock);
2464                 return;
2465         }
2466
2467         if (seq == atomic64_read(&j->seq)) {
2468                 bool set_need_write = false;
2469
2470                 if (parent &&
2471                     !closure_wait(&journal_cur_buf(j)->wait, parent))
2472                         BUG();
2473
2474                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2475                         j->need_write_time = local_clock();
2476                         set_need_write = true;
2477                 }
2478
2479                 switch (journal_buf_switch(j, set_need_write)) {
2480                 case JOURNAL_ENTRY_ERROR:
2481                         if (parent)
2482                                 closure_wake_up(&journal_cur_buf(j)->wait);
2483                         break;
2484                 case JOURNAL_ENTRY_CLOSED:
2485                         /*
2486                          * Journal entry hasn't been opened yet, but caller
2487                          * claims it has something (seq == j->seq):
2488                          */
2489                         BUG();
2490                 case JOURNAL_ENTRY_INUSE:
2491                         break;
2492                 case JOURNAL_UNLOCKED:
2493                         return;
2494                 }
2495         } else if (parent &&
2496                    seq + 1 == atomic64_read(&j->seq) &&
2497                    j->reservations.prev_buf_unwritten) {
2498                 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
2499                         BUG();
2500
2501                 smp_mb();
2502
2503                 /* check if raced with write completion (or failure) */
2504                 if (!j->reservations.prev_buf_unwritten ||
2505                     bch2_journal_error(j))
2506                         closure_wake_up(&journal_prev_buf(j)->wait);
2507         }
2508
2509         spin_unlock(&j->lock);
2510 }
2511
2512 int bch2_journal_flush_seq(struct journal *j, u64 seq)
2513 {
2514         struct closure cl;
2515         u64 start_time = local_clock();
2516
2517         closure_init_stack(&cl);
2518         bch2_journal_flush_seq_async(j, seq, &cl);
2519         closure_sync(&cl);
2520
2521         bch2_time_stats_update(j->flush_seq_time, start_time);
2522
2523         return bch2_journal_error(j);
2524 }
2525
2526 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
2527 {
2528         struct journal_res res;
2529         unsigned u64s = jset_u64s(0);
2530
2531         memset(&res, 0, sizeof(res));
2532
2533         bch2_journal_res_get(j, &res, u64s, u64s);
2534         bch2_journal_res_put(j, &res);
2535
2536         bch2_journal_flush_seq_async(j, res.seq, parent);
2537 }
2538
2539 int bch2_journal_meta(struct journal *j)
2540 {
2541         struct journal_res res;
2542         unsigned u64s = jset_u64s(0);
2543         int ret;
2544
2545         memset(&res, 0, sizeof(res));
2546
2547         ret = bch2_journal_res_get(j, &res, u64s, u64s);
2548         if (ret)
2549                 return ret;
2550
2551         bch2_journal_res_put(j, &res);
2552
2553         return bch2_journal_flush_seq(j, res.seq);
2554 }
2555
2556 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
2557 {
2558         u64 seq, journal_seq;
2559
2560         spin_lock(&j->lock);
2561         journal_seq = atomic64_read(&j->seq);
2562
2563         if (journal_entry_is_open(j)) {
2564                 seq = journal_seq;
2565         } else if (journal_seq) {
2566                 seq = journal_seq - 1;
2567         } else {
2568                 spin_unlock(&j->lock);
2569                 return;
2570         }
2571         spin_unlock(&j->lock);
2572
2573         bch2_journal_flush_seq_async(j, seq, parent);
2574 }
2575
2576 int bch2_journal_flush(struct journal *j)
2577 {
2578         u64 seq, journal_seq;
2579
2580         spin_lock(&j->lock);
2581         journal_seq = atomic64_read(&j->seq);
2582
2583         if (journal_entry_is_open(j)) {
2584                 seq = journal_seq;
2585         } else if (journal_seq) {
2586                 seq = journal_seq - 1;
2587         } else {
2588                 spin_unlock(&j->lock);
2589                 return 0;
2590         }
2591         spin_unlock(&j->lock);
2592
2593         return bch2_journal_flush_seq(j, seq);
2594 }
2595
2596 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
2597 {
2598         union journal_res_state *s = &j->reservations;
2599         struct bch_dev *ca;
2600         unsigned iter;
2601         ssize_t ret = 0;
2602
2603         rcu_read_lock();
2604         spin_lock(&j->lock);
2605
2606         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2607                          "active journal entries:\t%zu\n"
2608                          "seq:\t\t\t%llu\n"
2609                          "last_seq:\t\t%llu\n"
2610                          "last_seq_ondisk:\t%llu\n"
2611                          "reservation count:\t%u\n"
2612                          "reservation offset:\t%u\n"
2613                          "current entry u64s:\t%u\n"
2614                          "io in flight:\t\t%i\n"
2615                          "need write:\t\t%i\n"
2616                          "dirty:\t\t\t%i\n"
2617                          "replay done:\t\t%i\n",
2618                          fifo_used(&j->pin),
2619                          (u64) atomic64_read(&j->seq),
2620                          last_seq(j),
2621                          j->last_seq_ondisk,
2622                          journal_state_count(*s, s->idx),
2623                          s->cur_entry_offset,
2624                          j->cur_entry_u64s,
2625                          s->prev_buf_unwritten,
2626                          test_bit(JOURNAL_NEED_WRITE,   &j->flags),
2627                          journal_entry_is_open(j),
2628                          test_bit(JOURNAL_REPLAY_DONE,  &j->flags));
2629
2630         spin_lock(&j->devs.lock);
2631         group_for_each_dev(ca, &j->devs, iter) {
2632                 struct journal_device *ja = &ca->journal;
2633
2634                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2635                                  "dev %u:\n"
2636                                  "\tnr\t\t%u\n"
2637                                  "\tcur_idx\t\t%u (seq %llu)\n"
2638                                  "\tlast_idx\t%u (seq %llu)\n",
2639                                  iter, ja->nr,
2640                                  ja->cur_idx,   ja->bucket_seq[ja->cur_idx],
2641                                  ja->last_idx,  ja->bucket_seq[ja->last_idx]);
2642         }
2643         spin_unlock(&j->devs.lock);
2644
2645         spin_unlock(&j->lock);
2646         rcu_read_unlock();
2647
2648         return ret;
2649 }
2650
2651 static bool bch2_journal_writing_to_device(struct bch_dev *ca)
2652 {
2653         struct journal *j = &ca->fs->journal;
2654         bool ret;
2655
2656         spin_lock(&j->lock);
2657         ret = bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key),
2658                                     ca->dev_idx);
2659         spin_unlock(&j->lock);
2660
2661         return ret;
2662 }
2663
2664 /*
2665  * This asumes that ca has already been marked read-only so that
2666  * journal_next_bucket won't pick buckets out of ca any more.
2667  * Hence, if the journal is not currently pointing to ca, there
2668  * will be no new writes to journal entries in ca after all the
2669  * pending ones have been flushed to disk.
2670  *
2671  * If the journal is being written to ca, write a new record, and
2672  * journal_next_bucket will notice that the device is no longer
2673  * writeable and pick a new set of devices to write to.
2674  */
2675
2676 int bch2_journal_move(struct bch_dev *ca)
2677 {
2678         u64 last_flushed_seq;
2679         struct journal_device *ja = &ca->journal;
2680         struct bch_fs *c = ca->fs;
2681         struct journal *j = &c->journal;
2682         unsigned i;
2683         int ret = 0;            /* Success */
2684
2685         if (bch2_journal_writing_to_device(ca)) {
2686                 /*
2687                  * bch_journal_meta will write a record and we'll wait
2688                  * for the write to complete.
2689                  * Actually writing the journal (journal_write_locked)
2690                  * will call journal_next_bucket which notices that the
2691                  * device is no longer writeable, and picks a new one.
2692                  */
2693                 bch2_journal_meta(j);
2694                 BUG_ON(bch2_journal_writing_to_device(ca));
2695         }
2696
2697         /*
2698          * Flush all btree updates to backing store so that any
2699          * journal entries written to ca become stale and are no
2700          * longer needed.
2701          */
2702
2703         /*
2704          * XXX: switch to normal journal reclaim machinery
2705          */
2706         bch2_btree_flush(c);
2707
2708         /*
2709          * Force a meta-data journal entry to be written so that
2710          * we have newer journal entries in devices other than ca,
2711          * and wait for the meta data write to complete.
2712          */
2713         bch2_journal_meta(j);
2714
2715         /*
2716          * Verify that we no longer need any of the journal entries in
2717          * the device
2718          */
2719         spin_lock(&j->lock);
2720         last_flushed_seq = last_seq(j);
2721         spin_unlock(&j->lock);
2722
2723         for (i = 0; i < ja->nr; i += 1)
2724                 BUG_ON(ja->bucket_seq[i] > last_flushed_seq);
2725
2726         return ret;
2727 }
2728
2729 void bch2_fs_journal_stop(struct journal *j)
2730 {
2731         if (!test_bit(JOURNAL_STARTED, &j->flags))
2732                 return;
2733
2734         /*
2735          * Empty out the journal by first flushing everything pinning existing
2736          * journal entries, then force a brand new empty journal entry to be
2737          * written:
2738          */
2739         bch2_journal_flush_pins(j);
2740         bch2_journal_flush_async(j, NULL);
2741         bch2_journal_meta(j);
2742
2743         cancel_delayed_work_sync(&j->write_work);
2744         cancel_delayed_work_sync(&j->reclaim_work);
2745 }
2746
2747 void bch2_dev_journal_exit(struct bch_dev *ca)
2748 {
2749         kfree(ca->journal.bio);
2750         kfree(ca->journal.buckets);
2751         kfree(ca->journal.bucket_seq);
2752
2753         ca->journal.bio         = NULL;
2754         ca->journal.buckets     = NULL;
2755         ca->journal.bucket_seq  = NULL;
2756 }
2757
2758 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
2759 {
2760         struct journal_device *ja = &ca->journal;
2761         struct bch_sb_field_journal *journal_buckets =
2762                 bch2_sb_get_journal(sb);
2763         unsigned i, journal_entry_pages;
2764
2765         journal_entry_pages =
2766                 DIV_ROUND_UP(1U << BCH_SB_JOURNAL_ENTRY_SIZE(sb),
2767                              PAGE_SECTORS);
2768
2769         ja->nr = bch2_nr_journal_buckets(journal_buckets);
2770
2771         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2772         if (!ja->bucket_seq)
2773                 return -ENOMEM;
2774
2775         ca->journal.bio = bio_kmalloc(GFP_KERNEL, journal_entry_pages);
2776         if (!ca->journal.bio)
2777                 return -ENOMEM;
2778
2779         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2780         if (!ja->buckets)
2781                 return -ENOMEM;
2782
2783         for (i = 0; i < ja->nr; i++)
2784                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
2785
2786         return 0;
2787 }
2788
2789 void bch2_fs_journal_exit(struct journal *j)
2790 {
2791         unsigned order = get_order(j->entry_size_max);
2792
2793         free_pages((unsigned long) j->buf[1].data, order);
2794         free_pages((unsigned long) j->buf[0].data, order);
2795         free_fifo(&j->pin);
2796 }
2797
2798 int bch2_fs_journal_init(struct journal *j, unsigned entry_size_max)
2799 {
2800         static struct lock_class_key res_key;
2801         unsigned order = get_order(entry_size_max);
2802
2803         spin_lock_init(&j->lock);
2804         spin_lock_init(&j->pin_lock);
2805         init_waitqueue_head(&j->wait);
2806         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
2807         INIT_DELAYED_WORK(&j->reclaim_work, journal_reclaim_work);
2808         mutex_init(&j->blacklist_lock);
2809         INIT_LIST_HEAD(&j->seq_blacklist);
2810         spin_lock_init(&j->devs.lock);
2811         mutex_init(&j->reclaim_lock);
2812
2813         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
2814
2815         j->entry_size_max       = entry_size_max;
2816         j->write_delay_ms       = 100;
2817         j->reclaim_delay_ms     = 100;
2818
2819         bkey_extent_init(&j->key);
2820
2821         atomic64_set(&j->reservations.counter,
2822                 ((union journal_res_state)
2823                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
2824
2825         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
2826             !(j->buf[0].data = (void *) __get_free_pages(GFP_KERNEL, order)) ||
2827             !(j->buf[1].data = (void *) __get_free_pages(GFP_KERNEL, order)))
2828                 return -ENOMEM;
2829
2830         return 0;
2831 }