]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
Update bcachefs sources to da037866e6
[bcachefs-tools-debian] / libbcachefs / journal.c
1 /*
2  * bcachefs journalling code, for btree insertions
3  *
4  * Copyright 2012 Google, Inc.
5  */
6
7 #include "bcachefs.h"
8 #include "alloc.h"
9 #include "bkey_methods.h"
10 #include "buckets.h"
11 #include "btree_gc.h"
12 #include "btree_update.h"
13 #include "btree_io.h"
14 #include "checksum.h"
15 #include "debug.h"
16 #include "error.h"
17 #include "extents.h"
18 #include "io.h"
19 #include "keylist.h"
20 #include "journal.h"
21 #include "super-io.h"
22 #include "vstructs.h"
23
24 #include <trace/events/bcachefs.h>
25
26 static void journal_write(struct closure *);
27 static void journal_reclaim_fast(struct journal *);
28 static void journal_pin_add_entry(struct journal *,
29                                   struct journal_entry_pin_list *,
30                                   struct journal_entry_pin *,
31                                   journal_pin_flush_fn);
32
33 static inline struct journal_buf *journal_cur_buf(struct journal *j)
34 {
35         return j->buf + j->reservations.idx;
36 }
37
38 static inline struct journal_buf *journal_prev_buf(struct journal *j)
39 {
40         return j->buf + !j->reservations.idx;
41 }
42
43 /* Sequence number of oldest dirty journal entry */
44
45 static inline u64 last_seq(struct journal *j)
46 {
47         return atomic64_read(&j->seq) - fifo_used(&j->pin) + 1;
48 }
49
50 static inline u64 journal_pin_seq(struct journal *j,
51                                   struct journal_entry_pin_list *pin_list)
52 {
53         return last_seq(j) + fifo_entry_idx(&j->pin, pin_list);
54 }
55
56 static inline struct jset_entry *__jset_entry_type_next(struct jset *jset,
57                                         struct jset_entry *entry, unsigned type)
58 {
59         while (entry < vstruct_last(jset)) {
60                 if (JOURNAL_ENTRY_TYPE(entry) == type)
61                         return entry;
62
63                 entry = vstruct_next(entry);
64         }
65
66         return NULL;
67 }
68
69 #define for_each_jset_entry_type(entry, jset, type)                     \
70         for (entry = (jset)->start;                                     \
71              (entry = __jset_entry_type_next(jset, entry, type));       \
72              entry = vstruct_next(entry))
73
74 #define for_each_jset_key(k, _n, entry, jset)                           \
75         for_each_jset_entry_type(entry, jset, JOURNAL_ENTRY_BTREE_KEYS) \
76                 vstruct_for_each_safe(entry, k, _n)
77
78 static inline void bch2_journal_add_entry(struct journal_buf *buf,
79                                          const void *data, size_t u64s,
80                                          unsigned type, enum btree_id id,
81                                          unsigned level)
82 {
83         struct jset *jset = buf->data;
84
85         bch2_journal_add_entry_at(buf, data, u64s, type, id, level,
86                                  le32_to_cpu(jset->u64s));
87         le32_add_cpu(&jset->u64s, jset_u64s(u64s));
88 }
89
90 static struct jset_entry *bch2_journal_find_entry(struct jset *j, unsigned type,
91                                                  enum btree_id id)
92 {
93         struct jset_entry *entry;
94
95         for_each_jset_entry_type(entry, j, type)
96                 if (entry->btree_id == id)
97                         return entry;
98
99         return NULL;
100 }
101
102 struct bkey_i *bch2_journal_find_btree_root(struct bch_fs *c, struct jset *j,
103                                            enum btree_id id, unsigned *level)
104 {
105         struct bkey_i *k;
106         struct jset_entry *entry =
107                 bch2_journal_find_entry(j, JOURNAL_ENTRY_BTREE_ROOT, id);
108
109         if (!entry)
110                 return NULL;
111
112         k = entry->start;
113         *level = entry->level;
114         *level = entry->level;
115         return k;
116 }
117
118 static void bch2_journal_add_btree_root(struct journal_buf *buf,
119                                        enum btree_id id, struct bkey_i *k,
120                                        unsigned level)
121 {
122         bch2_journal_add_entry(buf, k, k->k.u64s,
123                               JOURNAL_ENTRY_BTREE_ROOT, id, level);
124 }
125
126 static inline void bch2_journal_add_prios(struct journal *j,
127                                          struct journal_buf *buf)
128 {
129         /*
130          * no prio bucket ptrs yet... XXX should change the allocator so this
131          * can't happen:
132          */
133         if (!buf->nr_prio_buckets)
134                 return;
135
136         bch2_journal_add_entry(buf, j->prio_buckets, buf->nr_prio_buckets,
137                               JOURNAL_ENTRY_PRIO_PTRS, 0, 0);
138 }
139
140 static void journal_seq_blacklist_flush(struct journal *j,
141                                 struct journal_entry_pin *pin, u64 seq)
142 {
143         struct bch_fs *c =
144                 container_of(j, struct bch_fs, journal);
145         struct journal_seq_blacklist *bl =
146                 container_of(pin, struct journal_seq_blacklist, pin);
147         struct blacklisted_node n;
148         struct closure cl;
149         unsigned i;
150         int ret;
151
152         closure_init_stack(&cl);
153
154         for (i = 0;; i++) {
155                 struct btree_iter iter;
156                 struct btree *b;
157
158                 mutex_lock(&j->blacklist_lock);
159                 if (i >= bl->nr_entries) {
160                         mutex_unlock(&j->blacklist_lock);
161                         break;
162                 }
163                 n = bl->entries[i];
164                 mutex_unlock(&j->blacklist_lock);
165
166                 bch2_btree_iter_init(&iter, c, n.btree_id, n.pos);
167                 iter.is_extents = false;
168 redo_peek:
169                 b = bch2_btree_iter_peek_node(&iter);
170
171                 /* The node might have already been rewritten: */
172
173                 if (b->data->keys.seq == n.seq) {
174                         ret = bch2_btree_node_rewrite(&iter, b, &cl);
175                         if (ret) {
176                                 bch2_btree_iter_unlock(&iter);
177                                 closure_sync(&cl);
178
179                                 if (ret == -EAGAIN ||
180                                     ret == -EINTR)
181                                         goto redo_peek;
182
183                                 /* -EROFS or perhaps -ENOSPC - bail out: */
184                                 /* XXX warn here */
185                                 return;
186                         }
187                 }
188
189                 bch2_btree_iter_unlock(&iter);
190         }
191
192         closure_sync(&cl);
193
194         for (i = 0;; i++) {
195                 struct btree_interior_update *as;
196                 struct pending_btree_node_free *d;
197
198                 mutex_lock(&j->blacklist_lock);
199                 if (i >= bl->nr_entries) {
200                         mutex_unlock(&j->blacklist_lock);
201                         break;
202                 }
203                 n = bl->entries[i];
204                 mutex_unlock(&j->blacklist_lock);
205 redo_wait:
206                 mutex_lock(&c->btree_interior_update_lock);
207
208                 /*
209                  * Is the node on the list of pending interior node updates -
210                  * being freed? If so, wait for that to finish:
211                  */
212                 for_each_pending_btree_node_free(c, as, d)
213                         if (n.seq       == d->seq &&
214                             n.btree_id  == d->btree_id &&
215                             !d->level &&
216                             !bkey_cmp(n.pos, d->key.k.p)) {
217                                 closure_wait(&as->wait, &cl);
218                                 mutex_unlock(&c->btree_interior_update_lock);
219                                 closure_sync(&cl);
220                                 goto redo_wait;
221                         }
222
223                 mutex_unlock(&c->btree_interior_update_lock);
224         }
225
226         mutex_lock(&j->blacklist_lock);
227
228         bch2_journal_pin_drop(j, &bl->pin);
229         list_del(&bl->list);
230         kfree(bl->entries);
231         kfree(bl);
232
233         mutex_unlock(&j->blacklist_lock);
234 }
235
236 static struct journal_seq_blacklist *
237 journal_seq_blacklist_find(struct journal *j, u64 seq)
238 {
239         struct journal_seq_blacklist *bl;
240
241         lockdep_assert_held(&j->blacklist_lock);
242
243         list_for_each_entry(bl, &j->seq_blacklist, list)
244                 if (seq == bl->seq)
245                         return bl;
246
247         return NULL;
248 }
249
250 static struct journal_seq_blacklist *
251 bch2_journal_seq_blacklisted_new(struct journal *j, u64 seq)
252 {
253         struct journal_seq_blacklist *bl;
254
255         lockdep_assert_held(&j->blacklist_lock);
256
257         /*
258          * When we start the journal, bch2_journal_start() will skip over @seq:
259          */
260
261         bl = kzalloc(sizeof(*bl), GFP_KERNEL);
262         if (!bl)
263                 return NULL;
264
265         bl->seq = seq;
266         list_add_tail(&bl->list, &j->seq_blacklist);
267         return bl;
268 }
269
270 /*
271  * Returns true if @seq is newer than the most recent journal entry that got
272  * written, and data corresponding to @seq should be ignored - also marks @seq
273  * as blacklisted so that on future restarts the corresponding data will still
274  * be ignored:
275  */
276 int bch2_journal_seq_should_ignore(struct bch_fs *c, u64 seq, struct btree *b)
277 {
278         struct journal *j = &c->journal;
279         struct journal_seq_blacklist *bl = NULL;
280         struct blacklisted_node *n;
281         u64 journal_seq, i;
282         int ret = 0;
283
284         if (!seq)
285                 return 0;
286
287         journal_seq = atomic64_read(&j->seq);
288
289         /* Interier updates aren't journalled: */
290         BUG_ON(b->level);
291         BUG_ON(seq > journal_seq && test_bit(BCH_FS_INITIAL_GC_DONE, &c->flags));
292
293         /*
294          * Decrease this back to j->seq + 2 when we next rev the on disk format:
295          * increasing it temporarily to work around bug in old kernels
296          */
297         bch2_fs_inconsistent_on(seq > journal_seq + 4, c,
298                          "bset journal seq too far in the future: %llu > %llu",
299                          seq, journal_seq);
300
301         if (seq <= journal_seq &&
302             list_empty_careful(&j->seq_blacklist))
303                 return 0;
304
305         mutex_lock(&j->blacklist_lock);
306
307         if (seq <= journal_seq) {
308                 bl = journal_seq_blacklist_find(j, seq);
309                 if (!bl)
310                         goto out;
311         } else {
312                 bch_verbose(c, "btree node %u:%llu:%llu has future journal sequence number %llu, blacklisting",
313                             b->btree_id, b->key.k.p.inode, b->key.k.p.offset, seq);
314
315                 for (i = journal_seq + 1; i <= seq; i++) {
316                         bl = journal_seq_blacklist_find(j, i) ?:
317                                 bch2_journal_seq_blacklisted_new(j, i);
318                         if (!bl) {
319                                 ret = -ENOMEM;
320                                 goto out;
321                         }
322                 }
323         }
324
325         for (n = bl->entries; n < bl->entries + bl->nr_entries; n++)
326                 if (b->data->keys.seq   == n->seq &&
327                     b->btree_id         == n->btree_id &&
328                     !bkey_cmp(b->key.k.p, n->pos))
329                         goto found_entry;
330
331         if (!bl->nr_entries ||
332             is_power_of_2(bl->nr_entries)) {
333                 n = krealloc(bl->entries,
334                              max(bl->nr_entries * 2, 8UL) * sizeof(*n),
335                              GFP_KERNEL);
336                 if (!n) {
337                         ret = -ENOMEM;
338                         goto out;
339                 }
340                 bl->entries = n;
341         }
342
343         bl->entries[bl->nr_entries++] = (struct blacklisted_node) {
344                 .seq            = b->data->keys.seq,
345                 .btree_id       = b->btree_id,
346                 .pos            = b->key.k.p,
347         };
348 found_entry:
349         ret = 1;
350 out:
351         mutex_unlock(&j->blacklist_lock);
352         return ret;
353 }
354
355 /*
356  * Journal replay/recovery:
357  *
358  * This code is all driven from bch2_fs_start(); we first read the journal
359  * entries, do some other stuff, then we mark all the keys in the journal
360  * entries (same as garbage collection would), then we replay them - reinserting
361  * them into the cache in precisely the same order as they appear in the
362  * journal.
363  *
364  * We only journal keys that go in leaf nodes, which simplifies things quite a
365  * bit.
366  */
367
368 struct journal_list {
369         struct closure          cl;
370         struct mutex            lock;
371         struct list_head        *head;
372         int                     ret;
373 };
374
375 #define JOURNAL_ENTRY_ADD_OK            0
376 #define JOURNAL_ENTRY_ADD_OUT_OF_RANGE  5
377
378 /*
379  * Given a journal entry we just read, add it to the list of journal entries to
380  * be replayed:
381  */
382 static int journal_entry_add(struct bch_fs *c, struct journal_list *jlist,
383                     struct jset *j)
384 {
385         struct journal_replay *i, *pos;
386         struct list_head *where;
387         size_t bytes = vstruct_bytes(j);
388         __le64 last_seq;
389         int ret;
390
391         mutex_lock(&jlist->lock);
392
393         last_seq = !list_empty(jlist->head)
394                 ? list_last_entry(jlist->head, struct journal_replay,
395                                   list)->j.last_seq
396                 : 0;
397
398         /* Is this entry older than the range we need? */
399         if (le64_to_cpu(j->seq) < le64_to_cpu(last_seq)) {
400                 ret = JOURNAL_ENTRY_ADD_OUT_OF_RANGE;
401                 goto out;
402         }
403
404         /* Drop entries we don't need anymore */
405         list_for_each_entry_safe(i, pos, jlist->head, list) {
406                 if (le64_to_cpu(i->j.seq) >= le64_to_cpu(j->last_seq))
407                         break;
408                 list_del(&i->list);
409                 kvpfree(i, offsetof(struct journal_replay, j) +
410                         vstruct_bytes(&i->j));
411         }
412
413         list_for_each_entry_reverse(i, jlist->head, list) {
414                 /* Duplicate? */
415                 if (le64_to_cpu(j->seq) == le64_to_cpu(i->j.seq)) {
416                         fsck_err_on(bytes != vstruct_bytes(&i->j) ||
417                                     memcmp(j, &i->j, bytes), c,
418                                     "found duplicate but non identical journal entries (seq %llu)",
419                                     le64_to_cpu(j->seq));
420
421                         ret = JOURNAL_ENTRY_ADD_OK;
422                         goto out;
423                 }
424
425                 if (le64_to_cpu(j->seq) > le64_to_cpu(i->j.seq)) {
426                         where = &i->list;
427                         goto add;
428                 }
429         }
430
431         where = jlist->head;
432 add:
433         i = kvpmalloc(offsetof(struct journal_replay, j) + bytes, GFP_KERNEL);
434         if (!i) {
435                 ret = -ENOMEM;
436                 goto out;
437         }
438
439         memcpy(&i->j, j, bytes);
440         list_add(&i->list, where);
441         ret = JOURNAL_ENTRY_ADD_OK;
442 out:
443 fsck_err:
444         mutex_unlock(&jlist->lock);
445         return ret;
446 }
447
448 static struct nonce journal_nonce(const struct jset *jset)
449 {
450         return (struct nonce) {{
451                 [0] = 0,
452                 [1] = ((__le32 *) &jset->seq)[0],
453                 [2] = ((__le32 *) &jset->seq)[1],
454                 [3] = BCH_NONCE_JOURNAL,
455         }};
456 }
457
458 static void journal_entry_null_range(void *start, void *end)
459 {
460         struct jset_entry *entry;
461
462         for (entry = start; entry != end; entry = vstruct_next(entry)) {
463                 entry->u64s     = 0;
464                 entry->btree_id = 0;
465                 entry->level    = 0;
466                 entry->flags    = 0;
467                 SET_JOURNAL_ENTRY_TYPE(entry, 0);
468         }
469 }
470
471 static int journal_validate_key(struct bch_fs *c, struct jset *j,
472                                 struct jset_entry *entry,
473                                 struct bkey_i *k, enum bkey_type key_type,
474                                 const char *type)
475 {
476         void *next = vstruct_next(entry);
477         const char *invalid;
478         char buf[160];
479         int ret = 0;
480
481         if (mustfix_fsck_err_on(!k->k.u64s, c,
482                         "invalid %s in journal: k->u64s 0", type)) {
483                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
484                 journal_entry_null_range(vstruct_next(entry), next);
485                 return 0;
486         }
487
488         if (mustfix_fsck_err_on((void *) bkey_next(k) >
489                                 (void *) vstruct_next(entry), c,
490                         "invalid %s in journal: extends past end of journal entry",
491                         type)) {
492                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
493                 journal_entry_null_range(vstruct_next(entry), next);
494                 return 0;
495         }
496
497         if (mustfix_fsck_err_on(k->k.format != KEY_FORMAT_CURRENT, c,
498                         "invalid %s in journal: bad format %u",
499                         type, k->k.format)) {
500                 le16_add_cpu(&entry->u64s, -k->k.u64s);
501                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
502                 journal_entry_null_range(vstruct_next(entry), next);
503                 return 0;
504         }
505
506         if (JSET_BIG_ENDIAN(j) != CPU_BIG_ENDIAN)
507                 bch2_bkey_swab(key_type, NULL, bkey_to_packed(k));
508
509         invalid = bch2_bkey_invalid(c, key_type, bkey_i_to_s_c(k));
510         if (invalid) {
511                 bch2_bkey_val_to_text(c, key_type, buf, sizeof(buf),
512                                      bkey_i_to_s_c(k));
513                 mustfix_fsck_err(c, "invalid %s in journal: %s", type, buf);
514
515                 le16_add_cpu(&entry->u64s, -k->k.u64s);
516                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
517                 journal_entry_null_range(vstruct_next(entry), next);
518                 return 0;
519         }
520 fsck_err:
521         return ret;
522 }
523
524 #define JOURNAL_ENTRY_REREAD    5
525 #define JOURNAL_ENTRY_NONE      6
526 #define JOURNAL_ENTRY_BAD       7
527
528 static int journal_entry_validate(struct bch_fs *c,
529                                   struct jset *j, u64 sector,
530                                   unsigned bucket_sectors_left,
531                                   unsigned sectors_read)
532 {
533         struct jset_entry *entry;
534         size_t bytes = vstruct_bytes(j);
535         struct bch_csum csum;
536         int ret = 0;
537
538         if (le64_to_cpu(j->magic) != jset_magic(c))
539                 return JOURNAL_ENTRY_NONE;
540
541         if (le32_to_cpu(j->version) != BCACHE_JSET_VERSION) {
542                 bch_err(c, "unknown journal entry version %u",
543                         le32_to_cpu(j->version));
544                 return BCH_FSCK_UNKNOWN_VERSION;
545         }
546
547         if (mustfix_fsck_err_on(bytes > bucket_sectors_left << 9, c,
548                         "journal entry too big (%zu bytes), sector %lluu",
549                         bytes, sector)) {
550                 /* XXX: note we might have missing journal entries */
551                 return JOURNAL_ENTRY_BAD;
552         }
553
554         if (bytes > sectors_read << 9)
555                 return JOURNAL_ENTRY_REREAD;
556
557         if (fsck_err_on(!bch2_checksum_type_valid(c, JSET_CSUM_TYPE(j)), c,
558                         "journal entry with unknown csum type %llu sector %lluu",
559                         JSET_CSUM_TYPE(j), sector))
560                 return JOURNAL_ENTRY_BAD;
561
562         csum = csum_vstruct(c, JSET_CSUM_TYPE(j), journal_nonce(j), j);
563         if (mustfix_fsck_err_on(bch2_crc_cmp(csum, j->csum), c,
564                         "journal checksum bad, sector %llu", sector)) {
565                 /* XXX: retry IO, when we start retrying checksum errors */
566                 /* XXX: note we might have missing journal entries */
567                 return JOURNAL_ENTRY_BAD;
568         }
569
570         bch2_encrypt(c, JSET_CSUM_TYPE(j), journal_nonce(j),
571                     j->encrypted_start,
572                     vstruct_end(j) - (void *) j->encrypted_start);
573
574         if (mustfix_fsck_err_on(le64_to_cpu(j->last_seq) > le64_to_cpu(j->seq), c,
575                         "invalid journal entry: last_seq > seq"))
576                 j->last_seq = j->seq;
577
578         vstruct_for_each(j, entry) {
579                 struct bkey_i *k;
580
581                 if (mustfix_fsck_err_on(vstruct_next(entry) >
582                                         vstruct_last(j), c,
583                                 "journal entry extents past end of jset")) {
584                         j->u64s = cpu_to_le64((u64 *) entry - j->_data);
585                         break;
586                 }
587
588                 switch (JOURNAL_ENTRY_TYPE(entry)) {
589                 case JOURNAL_ENTRY_BTREE_KEYS:
590                         vstruct_for_each(entry, k) {
591                                 ret = journal_validate_key(c, j, entry, k,
592                                                 bkey_type(entry->level,
593                                                           entry->btree_id),
594                                                 "key");
595                                 if (ret)
596                                         goto fsck_err;
597                         }
598                         break;
599
600                 case JOURNAL_ENTRY_BTREE_ROOT:
601                         k = entry->start;
602
603                         if (mustfix_fsck_err_on(!entry->u64s ||
604                                         le16_to_cpu(entry->u64s) != k->k.u64s, c,
605                                         "invalid btree root journal entry: wrong number of keys")) {
606                                 journal_entry_null_range(entry,
607                                                 vstruct_next(entry));
608                                 continue;
609                         }
610
611                         ret = journal_validate_key(c, j, entry, k,
612                                                    BKEY_TYPE_BTREE, "btree root");
613                         if (ret)
614                                 goto fsck_err;
615                         break;
616
617                 case JOURNAL_ENTRY_PRIO_PTRS:
618                         break;
619
620                 case JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED:
621                         if (mustfix_fsck_err_on(le16_to_cpu(entry->u64s) != 1, c,
622                                 "invalid journal seq blacklist entry: bad size")) {
623                                 journal_entry_null_range(entry,
624                                                 vstruct_next(entry));
625                         }
626
627                         break;
628                 default:
629                         mustfix_fsck_err(c, "invalid journal entry type %llu",
630                                  JOURNAL_ENTRY_TYPE(entry));
631                         journal_entry_null_range(entry, vstruct_next(entry));
632                         break;
633                 }
634         }
635
636 fsck_err:
637         return ret;
638 }
639
640 struct journal_read_buf {
641         void            *data;
642         size_t          size;
643 };
644
645 static int journal_read_buf_realloc(struct journal_read_buf *b,
646                                     size_t new_size)
647 {
648         void *n;
649
650         /* the bios are sized for this many pages, max: */
651         if (new_size > JOURNAL_ENTRY_SIZE_MAX)
652                 return -ENOMEM;
653
654         new_size = roundup_pow_of_two(new_size);
655         n = kvpmalloc(new_size, GFP_KERNEL);
656         if (!n)
657                 return -ENOMEM;
658
659         kvpfree(b->data, b->size);
660         b->data = n;
661         b->size = new_size;
662         return 0;
663 }
664
665 static int journal_read_bucket(struct bch_dev *ca,
666                                struct journal_read_buf *buf,
667                                struct journal_list *jlist,
668                                unsigned bucket, u64 *seq, bool *entries_found)
669 {
670         struct bch_fs *c = ca->fs;
671         struct journal_device *ja = &ca->journal;
672         struct bio *bio = ja->bio;
673         struct jset *j = NULL;
674         unsigned sectors, sectors_read = 0;
675         u64 offset = bucket_to_sector(ca, ja->buckets[bucket]),
676             end = offset + ca->mi.bucket_size;
677         bool saw_bad = false;
678         int ret = 0;
679
680         pr_debug("reading %u", bucket);
681
682         while (offset < end) {
683                 if (!sectors_read) {
684 reread:                 sectors_read = min_t(unsigned,
685                                 end - offset, buf->size >> 9);
686
687                         bio_reset(bio);
688                         bio->bi_bdev            = ca->disk_sb.bdev;
689                         bio->bi_iter.bi_sector  = offset;
690                         bio->bi_iter.bi_size    = sectors_read << 9;
691                         bio_set_op_attrs(bio, REQ_OP_READ, 0);
692                         bch2_bio_map(bio, buf->data);
693
694                         ret = submit_bio_wait(bio);
695
696                         if (bch2_dev_fatal_io_err_on(ret, ca,
697                                                   "journal read from sector %llu",
698                                                   offset) ||
699                             bch2_meta_read_fault("journal"))
700                                 return -EIO;
701
702                         j = buf->data;
703                 }
704
705                 ret = journal_entry_validate(c, j, offset,
706                                         end - offset, sectors_read);
707                 switch (ret) {
708                 case BCH_FSCK_OK:
709                         break;
710                 case JOURNAL_ENTRY_REREAD:
711                         if (vstruct_bytes(j) > buf->size) {
712                                 ret = journal_read_buf_realloc(buf,
713                                                         vstruct_bytes(j));
714                                 if (ret)
715                                         return ret;
716                         }
717                         goto reread;
718                 case JOURNAL_ENTRY_NONE:
719                         if (!saw_bad)
720                                 return 0;
721                         sectors = c->sb.block_size;
722                         goto next_block;
723                 case JOURNAL_ENTRY_BAD:
724                         saw_bad = true;
725                         sectors = c->sb.block_size;
726                         goto next_block;
727                 default:
728                         return ret;
729                 }
730
731                 /*
732                  * This happens sometimes if we don't have discards on -
733                  * when we've partially overwritten a bucket with new
734                  * journal entries. We don't need the rest of the
735                  * bucket:
736                  */
737                 if (le64_to_cpu(j->seq) < ja->bucket_seq[bucket])
738                         return 0;
739
740                 ja->bucket_seq[bucket] = le64_to_cpu(j->seq);
741
742                 ret = journal_entry_add(c, jlist, j);
743                 switch (ret) {
744                 case JOURNAL_ENTRY_ADD_OK:
745                         *entries_found = true;
746                         break;
747                 case JOURNAL_ENTRY_ADD_OUT_OF_RANGE:
748                         break;
749                 default:
750                         return ret;
751                 }
752
753                 if (le64_to_cpu(j->seq) > *seq)
754                         *seq = le64_to_cpu(j->seq);
755
756                 sectors = vstruct_sectors(j, c->block_bits);
757 next_block:
758                 pr_debug("next");
759                 offset          += sectors;
760                 sectors_read    -= sectors;
761                 j = ((void *) j) + (sectors << 9);
762         }
763
764         return 0;
765 }
766
767 static void bch2_journal_read_device(struct closure *cl)
768 {
769 #define read_bucket(b)                                                  \
770         ({                                                              \
771                 bool entries_found = false;                             \
772                 ret = journal_read_bucket(ca, &buf, jlist, b, &seq,     \
773                                           &entries_found);              \
774                 if (ret)                                                \
775                         goto err;                                       \
776                 __set_bit(b, bitmap);                                   \
777                 entries_found;                                          \
778          })
779
780         struct journal_device *ja =
781                 container_of(cl, struct journal_device, read);
782         struct bch_dev *ca = container_of(ja, struct bch_dev, journal);
783         struct journal_list *jlist =
784                 container_of(cl->parent, struct journal_list, cl);
785         struct request_queue *q = bdev_get_queue(ca->disk_sb.bdev);
786         struct journal_read_buf buf = { NULL, 0 };
787
788         DECLARE_BITMAP(bitmap, ja->nr);
789         unsigned i, l, r;
790         u64 seq = 0;
791         int ret;
792
793         if (!ja->nr)
794                 goto out;
795
796         bitmap_zero(bitmap, ja->nr);
797         ret = journal_read_buf_realloc(&buf, PAGE_SIZE);
798         if (ret)
799                 goto err;
800
801         pr_debug("%u journal buckets", ja->nr);
802
803         /*
804          * If the device supports discard but not secure discard, we can't do
805          * the fancy fibonacci hash/binary search because the live journal
806          * entries might not form a contiguous range:
807          */
808         for (i = 0; i < ja->nr; i++)
809                 read_bucket(i);
810         goto search_done;
811
812         if (!blk_queue_nonrot(q))
813                 goto linear_scan;
814
815         /*
816          * Read journal buckets ordered by golden ratio hash to quickly
817          * find a sequence of buckets with valid journal entries
818          */
819         for (i = 0; i < ja->nr; i++) {
820                 l = (i * 2654435769U) % ja->nr;
821
822                 if (test_bit(l, bitmap))
823                         break;
824
825                 if (read_bucket(l))
826                         goto bsearch;
827         }
828
829         /*
830          * If that fails, check all the buckets we haven't checked
831          * already
832          */
833         pr_debug("falling back to linear search");
834 linear_scan:
835         for (l = find_first_zero_bit(bitmap, ja->nr);
836              l < ja->nr;
837              l = find_next_zero_bit(bitmap, ja->nr, l + 1))
838                 if (read_bucket(l))
839                         goto bsearch;
840
841         /* no journal entries on this device? */
842         if (l == ja->nr)
843                 goto out;
844 bsearch:
845         /* Binary search */
846         r = find_next_bit(bitmap, ja->nr, l + 1);
847         pr_debug("starting binary search, l %u r %u", l, r);
848
849         while (l + 1 < r) {
850                 unsigned m = (l + r) >> 1;
851                 u64 cur_seq = seq;
852
853                 read_bucket(m);
854
855                 if (cur_seq != seq)
856                         l = m;
857                 else
858                         r = m;
859         }
860
861 search_done:
862         /*
863          * Find the journal bucket with the highest sequence number:
864          *
865          * If there's duplicate journal entries in multiple buckets (which
866          * definitely isn't supposed to happen, but...) - make sure to start
867          * cur_idx at the last of those buckets, so we don't deadlock trying to
868          * allocate
869          */
870         seq = 0;
871
872         for (i = 0; i < ja->nr; i++)
873                 if (ja->bucket_seq[i] >= seq &&
874                     ja->bucket_seq[i] != ja->bucket_seq[(i + 1) % ja->nr]) {
875                         /*
876                          * When journal_next_bucket() goes to allocate for
877                          * the first time, it'll use the bucket after
878                          * ja->cur_idx
879                          */
880                         ja->cur_idx = i;
881                         seq = ja->bucket_seq[i];
882                 }
883
884         /*
885          * Set last_idx to indicate the entire journal is full and needs to be
886          * reclaimed - journal reclaim will immediately reclaim whatever isn't
887          * pinned when it first runs:
888          */
889         ja->last_idx = (ja->cur_idx + 1) % ja->nr;
890
891         /*
892          * Read buckets in reverse order until we stop finding more journal
893          * entries:
894          */
895         for (i = (ja->cur_idx + ja->nr - 1) % ja->nr;
896              i != ja->cur_idx;
897              i = (i + ja->nr - 1) % ja->nr)
898                 if (!test_bit(i, bitmap) &&
899                     !read_bucket(i))
900                         break;
901 out:
902         kvpfree(buf.data, buf.size);
903         percpu_ref_put(&ca->io_ref);
904         closure_return(cl);
905 err:
906         mutex_lock(&jlist->lock);
907         jlist->ret = ret;
908         mutex_unlock(&jlist->lock);
909         goto out;
910 #undef read_bucket
911 }
912
913 void bch2_journal_entries_free(struct list_head *list)
914 {
915
916         while (!list_empty(list)) {
917                 struct journal_replay *i =
918                         list_first_entry(list, struct journal_replay, list);
919                 list_del(&i->list);
920                 kvpfree(i, offsetof(struct journal_replay, j) +
921                         vstruct_bytes(&i->j));
922         }
923 }
924
925 static int journal_seq_blacklist_read(struct journal *j,
926                                       struct journal_replay *i,
927                                       struct journal_entry_pin_list *p)
928 {
929         struct bch_fs *c = container_of(j, struct bch_fs, journal);
930         struct jset_entry *entry;
931         struct journal_seq_blacklist *bl;
932         u64 seq;
933
934         for_each_jset_entry_type(entry, &i->j,
935                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED) {
936                 seq = le64_to_cpu(entry->_data[0]);
937
938                 bch_verbose(c, "blacklisting existing journal seq %llu", seq);
939
940                 bl = bch2_journal_seq_blacklisted_new(j, seq);
941                 if (!bl)
942                         return -ENOMEM;
943
944                 journal_pin_add_entry(j, p, &bl->pin,
945                                   journal_seq_blacklist_flush);
946                 bl->written = true;
947         }
948
949         return 0;
950 }
951
952 static inline bool journal_has_keys(struct list_head *list)
953 {
954         struct journal_replay *i;
955         struct jset_entry *entry;
956         struct bkey_i *k, *_n;
957
958         list_for_each_entry(i, list, list)
959                 for_each_jset_key(k, _n, entry, &i->j)
960                         return true;
961
962         return false;
963 }
964
965 int bch2_journal_read(struct bch_fs *c, struct list_head *list)
966 {
967         struct journal *j = &c->journal;
968         struct jset_entry *prio_ptrs;
969         struct journal_list jlist;
970         struct journal_replay *i;
971         struct journal_entry_pin_list *p;
972         struct bch_dev *ca;
973         u64 cur_seq, end_seq;
974         unsigned iter, keys = 0, entries = 0;
975         int ret = 0;
976
977         closure_init_stack(&jlist.cl);
978         mutex_init(&jlist.lock);
979         jlist.head = list;
980         jlist.ret = 0;
981
982         for_each_readable_member(ca, c, iter) {
983                 percpu_ref_get(&ca->io_ref);
984                 closure_call(&ca->journal.read,
985                              bch2_journal_read_device,
986                              system_unbound_wq,
987                              &jlist.cl);
988         }
989
990         closure_sync(&jlist.cl);
991
992         if (jlist.ret)
993                 return jlist.ret;
994
995         if (list_empty(list)){
996                 bch_err(c, "no journal entries found");
997                 return BCH_FSCK_REPAIR_IMPOSSIBLE;
998         }
999
1000         fsck_err_on(c->sb.clean && journal_has_keys(list), c,
1001                     "filesystem marked clean but journal has keys to replay");
1002
1003         i = list_last_entry(list, struct journal_replay, list);
1004
1005         unfixable_fsck_err_on(le64_to_cpu(i->j.seq) -
1006                         le64_to_cpu(i->j.last_seq) + 1 > j->pin.size, c,
1007                         "too many journal entries open for refcount fifo");
1008
1009         atomic64_set(&j->seq, le64_to_cpu(i->j.seq));
1010         j->last_seq_ondisk = le64_to_cpu(i->j.last_seq);
1011
1012         j->pin.front    = le64_to_cpu(i->j.last_seq);
1013         j->pin.back     = le64_to_cpu(i->j.seq) + 1;
1014
1015         BUG_ON(last_seq(j) != le64_to_cpu(i->j.last_seq));
1016         BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
1017                &fifo_peek_back(&j->pin));
1018
1019         fifo_for_each_entry_ptr(p, &j->pin, iter) {
1020                 INIT_LIST_HEAD(&p->list);
1021                 atomic_set(&p->count, 0);
1022         }
1023
1024         mutex_lock(&j->blacklist_lock);
1025
1026         list_for_each_entry(i, list, list) {
1027                 p = journal_seq_pin(j, le64_to_cpu(i->j.seq));
1028
1029                 atomic_set(&p->count, 1);
1030
1031                 if (journal_seq_blacklist_read(j, i, p)) {
1032                         mutex_unlock(&j->blacklist_lock);
1033                         return -ENOMEM;
1034                 }
1035         }
1036
1037         mutex_unlock(&j->blacklist_lock);
1038
1039         cur_seq = last_seq(j);
1040         end_seq = le64_to_cpu(list_last_entry(list,
1041                                 struct journal_replay, list)->j.seq);
1042
1043         list_for_each_entry(i, list, list) {
1044                 struct jset_entry *entry;
1045                 struct bkey_i *k, *_n;
1046                 bool blacklisted;
1047
1048                 mutex_lock(&j->blacklist_lock);
1049                 while (cur_seq < le64_to_cpu(i->j.seq) &&
1050                        journal_seq_blacklist_find(j, cur_seq))
1051                         cur_seq++;
1052
1053                 blacklisted = journal_seq_blacklist_find(j,
1054                                                          le64_to_cpu(i->j.seq));
1055                 mutex_unlock(&j->blacklist_lock);
1056
1057                 fsck_err_on(blacklisted, c,
1058                             "found blacklisted journal entry %llu",
1059                             le64_to_cpu(i->j.seq));
1060
1061                 fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c,
1062                         "journal entries %llu-%llu missing! (replaying %llu-%llu)",
1063                         cur_seq, le64_to_cpu(i->j.seq) - 1,
1064                         last_seq(j), end_seq);
1065
1066                 cur_seq = le64_to_cpu(i->j.seq) + 1;
1067
1068                 for_each_jset_key(k, _n, entry, &i->j)
1069                         keys++;
1070                 entries++;
1071         }
1072
1073         bch_info(c, "journal read done, %i keys in %i entries, seq %llu",
1074                  keys, entries, (u64) atomic64_read(&j->seq));
1075
1076         i = list_last_entry(list, struct journal_replay, list);
1077         prio_ptrs = bch2_journal_find_entry(&i->j, JOURNAL_ENTRY_PRIO_PTRS, 0);
1078         if (prio_ptrs) {
1079                 memcpy_u64s(j->prio_buckets,
1080                             prio_ptrs->_data,
1081                             le16_to_cpu(prio_ptrs->u64s));
1082                 j->nr_prio_buckets = le16_to_cpu(prio_ptrs->u64s);
1083         }
1084 fsck_err:
1085         return ret;
1086 }
1087
1088 int bch2_journal_mark(struct bch_fs *c, struct list_head *list)
1089 {
1090         struct bkey_i *k, *n;
1091         struct jset_entry *j;
1092         struct journal_replay *r;
1093         int ret;
1094
1095         list_for_each_entry(r, list, list)
1096                 for_each_jset_key(k, n, j, &r->j) {
1097                         enum bkey_type type = bkey_type(j->level, j->btree_id);
1098                         struct bkey_s_c k_s_c = bkey_i_to_s_c(k);
1099
1100                         if (btree_type_has_ptrs(type)) {
1101                                 ret = bch2_btree_mark_key_initial(c, type, k_s_c);
1102                                 if (ret)
1103                                         return ret;
1104                         }
1105                 }
1106
1107         return 0;
1108 }
1109
1110 static bool journal_entry_is_open(struct journal *j)
1111 {
1112         return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
1113 }
1114
1115 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
1116 {
1117         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1118         struct journal_buf *w = journal_prev_buf(j);
1119
1120         atomic_dec_bug(&journal_seq_pin(j, w->data->seq)->count);
1121
1122         if (!need_write_just_set &&
1123             test_bit(JOURNAL_NEED_WRITE, &j->flags))
1124                 __bch2_time_stats_update(j->delay_time,
1125                                         j->need_write_time);
1126 #if 0
1127         closure_call(&j->io, journal_write, NULL, &c->cl);
1128 #else
1129         /* Shut sparse up: */
1130         closure_init(&j->io, &c->cl);
1131         set_closure_fn(&j->io, journal_write, NULL);
1132         journal_write(&j->io);
1133 #endif
1134 }
1135
1136 static void __journal_entry_new(struct journal *j, int count)
1137 {
1138         struct journal_entry_pin_list *p = fifo_push_ref(&j->pin);
1139
1140         /*
1141          * The fifo_push() needs to happen at the same time as j->seq is
1142          * incremented for last_seq() to be calculated correctly
1143          */
1144         atomic64_inc(&j->seq);
1145
1146         BUG_ON(journal_seq_pin(j, atomic64_read(&j->seq)) !=
1147                &fifo_peek_back(&j->pin));
1148
1149         INIT_LIST_HEAD(&p->list);
1150         atomic_set(&p->count, count);
1151 }
1152
1153 static void __bch2_journal_next_entry(struct journal *j)
1154 {
1155         struct journal_buf *buf;
1156
1157         __journal_entry_new(j, 1);
1158
1159         buf = journal_cur_buf(j);
1160         memset(buf->has_inode, 0, sizeof(buf->has_inode));
1161
1162         memset(buf->data, 0, sizeof(*buf->data));
1163         buf->data->seq  = cpu_to_le64(atomic64_read(&j->seq));
1164         buf->data->u64s = 0;
1165 }
1166
1167 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
1168 {
1169         unsigned ret = BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1170
1171         if (buf->nr_prio_buckets)
1172                 ret += JSET_KEYS_U64s + buf->nr_prio_buckets;
1173
1174         return ret;
1175 }
1176
1177 static enum {
1178         JOURNAL_ENTRY_ERROR,
1179         JOURNAL_ENTRY_INUSE,
1180         JOURNAL_ENTRY_CLOSED,
1181         JOURNAL_UNLOCKED,
1182 } journal_buf_switch(struct journal *j, bool need_write_just_set)
1183 {
1184         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1185         struct journal_buf *buf;
1186         union journal_res_state old, new;
1187         u64 v = atomic64_read(&j->reservations.counter);
1188
1189         lockdep_assert_held(&j->lock);
1190
1191         do {
1192                 old.v = new.v = v;
1193                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
1194                         return JOURNAL_ENTRY_CLOSED;
1195
1196                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1197                         return JOURNAL_ENTRY_ERROR;
1198
1199                 if (new.prev_buf_unwritten)
1200                         return JOURNAL_ENTRY_INUSE;
1201
1202                 /*
1203                  * avoid race between setting buf->data->u64s and
1204                  * journal_res_put starting write:
1205                  */
1206                 journal_state_inc(&new);
1207
1208                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
1209                 new.idx++;
1210                 new.prev_buf_unwritten = 1;
1211
1212                 BUG_ON(journal_state_count(new, new.idx));
1213         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1214                                        old.v, new.v)) != old.v);
1215
1216         journal_reclaim_fast(j);
1217
1218         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
1219
1220         buf = &j->buf[old.idx];
1221         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
1222         buf->data->last_seq     = cpu_to_le64(last_seq(j));
1223
1224         j->prev_buf_sectors =
1225                 vstruct_blocks_plus(buf->data, c->block_bits,
1226                                     journal_entry_u64s_reserve(buf)) *
1227                 c->sb.block_size;
1228
1229         BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
1230
1231         __bch2_journal_next_entry(j);
1232
1233         cancel_delayed_work(&j->write_work);
1234         spin_unlock(&j->lock);
1235
1236         if (c->bucket_journal_seq > 1 << 14) {
1237                 c->bucket_journal_seq = 0;
1238                 bch2_bucket_seq_cleanup(c);
1239         }
1240
1241         /* ugh - might be called from __journal_res_get() under wait_event() */
1242         __set_current_state(TASK_RUNNING);
1243         bch2_journal_buf_put(j, old.idx, need_write_just_set);
1244
1245         return JOURNAL_UNLOCKED;
1246 }
1247
1248 void bch2_journal_halt(struct journal *j)
1249 {
1250         union journal_res_state old, new;
1251         u64 v = atomic64_read(&j->reservations.counter);
1252
1253         do {
1254                 old.v = new.v = v;
1255                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1256                         return;
1257
1258                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
1259         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1260                                        old.v, new.v)) != old.v);
1261
1262         wake_up(&j->wait);
1263         closure_wake_up(&journal_cur_buf(j)->wait);
1264         closure_wake_up(&journal_prev_buf(j)->wait);
1265 }
1266
1267 static unsigned journal_dev_buckets_available(struct journal *j,
1268                                               struct bch_dev *ca)
1269 {
1270         struct journal_device *ja = &ca->journal;
1271         unsigned next = (ja->cur_idx + 1) % ja->nr;
1272         unsigned available = (ja->last_idx + ja->nr - next) % ja->nr;
1273
1274         /*
1275          * Hack to avoid a deadlock during journal replay:
1276          * journal replay might require setting a new btree
1277          * root, which requires writing another journal entry -
1278          * thus, if the journal is full (and this happens when
1279          * replaying the first journal bucket's entries) we're
1280          * screwed.
1281          *
1282          * So don't let the journal fill up unless we're in
1283          * replay:
1284          */
1285         if (test_bit(JOURNAL_REPLAY_DONE, &j->flags))
1286                 available = max((int) available - 2, 0);
1287
1288         /*
1289          * Don't use the last bucket unless writing the new last_seq
1290          * will make another bucket available:
1291          */
1292         if (ja->bucket_seq[ja->last_idx] >= last_seq(j))
1293                 available = max((int) available - 1, 0);
1294
1295         return available;
1296 }
1297
1298 /* returns number of sectors available for next journal entry: */
1299 static int journal_entry_sectors(struct journal *j)
1300 {
1301         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1302         struct bch_dev *ca;
1303         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
1304         unsigned sectors_available = UINT_MAX;
1305         unsigned i, nr_online = 0, nr_devs = 0;
1306
1307         lockdep_assert_held(&j->lock);
1308
1309         spin_lock(&j->devs.lock);
1310         group_for_each_dev(ca, &j->devs, i) {
1311                 unsigned buckets_required = 0;
1312
1313                 sectors_available = min_t(unsigned, sectors_available,
1314                                           ca->mi.bucket_size);
1315
1316                 /*
1317                  * Note that we don't allocate the space for a journal entry
1318                  * until we write it out - thus, if we haven't started the write
1319                  * for the previous entry we have to make sure we have space for
1320                  * it too:
1321                  */
1322                 if (bch2_extent_has_device(e.c, ca->dev_idx)) {
1323                         if (j->prev_buf_sectors > ca->journal.sectors_free)
1324                                 buckets_required++;
1325
1326                         if (j->prev_buf_sectors + sectors_available >
1327                             ca->journal.sectors_free)
1328                                 buckets_required++;
1329                 } else {
1330                         if (j->prev_buf_sectors + sectors_available >
1331                             ca->mi.bucket_size)
1332                                 buckets_required++;
1333
1334                         buckets_required++;
1335                 }
1336
1337                 if (journal_dev_buckets_available(j, ca) >= buckets_required)
1338                         nr_devs++;
1339                 nr_online++;
1340         }
1341         spin_unlock(&j->devs.lock);
1342
1343         if (nr_online < c->opts.metadata_replicas_required)
1344                 return -EROFS;
1345
1346         if (nr_devs < min_t(unsigned, nr_online, c->opts.metadata_replicas))
1347                 return 0;
1348
1349         return sectors_available;
1350 }
1351
1352 /*
1353  * should _only_ called from journal_res_get() - when we actually want a
1354  * journal reservation - journal entry is open means journal is dirty:
1355  */
1356 static int journal_entry_open(struct journal *j)
1357 {
1358         struct journal_buf *buf = journal_cur_buf(j);
1359         ssize_t u64s;
1360         int ret = 0, sectors;
1361
1362         lockdep_assert_held(&j->lock);
1363         BUG_ON(journal_entry_is_open(j));
1364
1365         if (!fifo_free(&j->pin))
1366                 return 0;
1367
1368         sectors = journal_entry_sectors(j);
1369         if (sectors <= 0)
1370                 return sectors;
1371
1372         buf->disk_sectors       = sectors;
1373
1374         sectors = min_t(unsigned, sectors, buf->size >> 9);
1375
1376         j->cur_buf_sectors      = sectors;
1377         buf->nr_prio_buckets    = j->nr_prio_buckets;
1378
1379         u64s = (sectors << 9) / sizeof(u64);
1380
1381         /* Subtract the journal header */
1382         u64s -= sizeof(struct jset) / sizeof(u64);
1383         /*
1384          * Btree roots, prio pointers don't get added until right before we do
1385          * the write:
1386          */
1387         u64s -= journal_entry_u64s_reserve(buf);
1388         u64s  = max_t(ssize_t, 0L, u64s);
1389
1390         BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
1391
1392         if (u64s > le32_to_cpu(buf->data->u64s)) {
1393                 union journal_res_state old, new;
1394                 u64 v = atomic64_read(&j->reservations.counter);
1395
1396                 /*
1397                  * Must be set before marking the journal entry as open:
1398                  */
1399                 j->cur_entry_u64s = u64s;
1400
1401                 do {
1402                         old.v = new.v = v;
1403
1404                         if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1405                                 return false;
1406
1407                         /* Handle any already added entries */
1408                         new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
1409                 } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1410                                                old.v, new.v)) != old.v);
1411                 ret = 1;
1412
1413                 wake_up(&j->wait);
1414
1415                 if (j->res_get_blocked_start) {
1416                         __bch2_time_stats_update(j->blocked_time,
1417                                                 j->res_get_blocked_start);
1418                         j->res_get_blocked_start = 0;
1419                 }
1420
1421                 mod_delayed_work(system_freezable_wq,
1422                                  &j->write_work,
1423                                  msecs_to_jiffies(j->write_delay_ms));
1424         }
1425
1426         return ret;
1427 }
1428
1429 void bch2_journal_start(struct bch_fs *c)
1430 {
1431         struct journal *j = &c->journal;
1432         struct journal_seq_blacklist *bl;
1433         u64 new_seq = 0;
1434
1435         list_for_each_entry(bl, &j->seq_blacklist, list)
1436                 new_seq = max(new_seq, bl->seq);
1437
1438         spin_lock(&j->lock);
1439
1440         set_bit(JOURNAL_STARTED, &j->flags);
1441
1442         while (atomic64_read(&j->seq) < new_seq)
1443                 __journal_entry_new(j, 0);
1444
1445         /*
1446          * journal_buf_switch() only inits the next journal entry when it
1447          * closes an open journal entry - the very first journal entry gets
1448          * initialized here:
1449          */
1450         __bch2_journal_next_entry(j);
1451
1452         /*
1453          * Adding entries to the next journal entry before allocating space on
1454          * disk for the next journal entry - this is ok, because these entries
1455          * only have to go down with the next journal entry we write:
1456          */
1457         list_for_each_entry(bl, &j->seq_blacklist, list)
1458                 if (!bl->written) {
1459                         bch2_journal_add_entry(journal_cur_buf(j), &bl->seq, 1,
1460                                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED,
1461                                         0, 0);
1462
1463                         journal_pin_add_entry(j,
1464                                               &fifo_peek_back(&j->pin),
1465                                               &bl->pin,
1466                                               journal_seq_blacklist_flush);
1467                         bl->written = true;
1468                 }
1469
1470         spin_unlock(&j->lock);
1471
1472         queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
1473 }
1474
1475 int bch2_journal_replay(struct bch_fs *c, struct list_head *list)
1476 {
1477         struct journal *j = &c->journal;
1478         struct bkey_i *k, *_n;
1479         struct jset_entry *entry;
1480         struct journal_replay *i, *n;
1481         int ret = 0, did_replay = 0;
1482
1483         list_for_each_entry_safe(i, n, list, list) {
1484                 j->replay_pin_list =
1485                         journal_seq_pin(j, le64_to_cpu(i->j.seq));
1486
1487                 for_each_jset_key(k, _n, entry, &i->j) {
1488                         struct disk_reservation disk_res;
1489
1490                         /*
1491                          * We might cause compressed extents to be split, so we
1492                          * need to pass in a disk_reservation:
1493                          */
1494                         BUG_ON(bch2_disk_reservation_get(c, &disk_res, 0, 0));
1495
1496                         ret = bch2_btree_insert(c, entry->btree_id, k,
1497                                                &disk_res, NULL, NULL,
1498                                                BTREE_INSERT_NOFAIL|
1499                                                BTREE_INSERT_JOURNAL_REPLAY);
1500                         bch2_disk_reservation_put(c, &disk_res);
1501
1502                         if (ret) {
1503                                 bch_err(c, "journal replay: error %d while replaying key",
1504                                         ret);
1505                                 goto err;
1506                         }
1507
1508                         cond_resched();
1509                         did_replay = true;
1510                 }
1511
1512                 if (atomic_dec_and_test(&j->replay_pin_list->count))
1513                         wake_up(&j->wait);
1514         }
1515
1516         j->replay_pin_list = NULL;
1517
1518         if (did_replay) {
1519                 bch2_btree_flush(c);
1520
1521                 /*
1522                  * Write a new journal entry _before_ we start journalling new data -
1523                  * otherwise, we could end up with btree node bsets with journal seqs
1524                  * arbitrarily far in the future vs. the most recently written journal
1525                  * entry on disk, if we crash before writing the next journal entry:
1526                  */
1527                 ret = bch2_journal_meta(j);
1528                 if (ret) {
1529                         bch_err(c, "journal replay: error %d flushing journal", ret);
1530                         goto err;
1531                 }
1532         }
1533
1534         bch2_journal_set_replay_done(j);
1535 err:
1536         bch2_journal_entries_free(list);
1537         return ret;
1538 }
1539
1540 #if 0
1541 /*
1542  * Allocate more journal space at runtime - not currently making use if it, but
1543  * the code works:
1544  */
1545 static int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
1546                                       unsigned nr)
1547 {
1548         struct journal *j = &c->journal;
1549         struct journal_device *ja = &ca->journal;
1550         struct bch_sb_field_journal *journal_buckets;
1551         struct disk_reservation disk_res = { 0, 0 };
1552         struct closure cl;
1553         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
1554         int ret = 0;
1555
1556         closure_init_stack(&cl);
1557
1558         /* don't handle reducing nr of buckets yet: */
1559         if (nr <= ja->nr)
1560                 return 0;
1561
1562         /*
1563          * note: journal buckets aren't really counted as _sectors_ used yet, so
1564          * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
1565          * when space used goes up without a reservation - but we do need the
1566          * reservation to ensure we'll actually be able to allocate:
1567          */
1568
1569         if (bch2_disk_reservation_get(c, &disk_res,
1570                         (nr - ja->nr) << ca->bucket_bits, 0))
1571                 return -ENOSPC;
1572
1573         mutex_lock(&c->sb_lock);
1574
1575         ret = -ENOMEM;
1576         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1577         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1578         if (!new_buckets || !new_bucket_seq)
1579                 goto err;
1580
1581         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
1582                                 nr + sizeof(*journal_buckets) / sizeof(u64));
1583         if (!journal_buckets)
1584                 goto err;
1585
1586         spin_lock(&j->lock);
1587         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
1588         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
1589         swap(new_buckets,       ja->buckets);
1590         swap(new_bucket_seq,    ja->bucket_seq);
1591
1592         while (ja->nr < nr) {
1593                 /* must happen under journal lock, to avoid racing with gc: */
1594                 u64 b = bch2_bucket_alloc(ca, RESERVE_NONE);
1595                 if (!b) {
1596                         if (!closure_wait(&c->freelist_wait, &cl)) {
1597                                 spin_unlock(&j->lock);
1598                                 closure_sync(&cl);
1599                                 spin_lock(&j->lock);
1600                         }
1601                         continue;
1602                 }
1603
1604                 bch2_mark_metadata_bucket(ca, &ca->buckets[b],
1605                                          BUCKET_JOURNAL, false);
1606                 bch2_mark_alloc_bucket(ca, &ca->buckets[b], false);
1607
1608                 memmove(ja->buckets + ja->last_idx + 1,
1609                         ja->buckets + ja->last_idx,
1610                         (ja->nr - ja->last_idx) * sizeof(u64));
1611                 memmove(ja->bucket_seq + ja->last_idx + 1,
1612                         ja->bucket_seq + ja->last_idx,
1613                         (ja->nr - ja->last_idx) * sizeof(u64));
1614                 memmove(journal_buckets->buckets + ja->last_idx + 1,
1615                         journal_buckets->buckets + ja->last_idx,
1616                         (ja->nr - ja->last_idx) * sizeof(u64));
1617
1618                 ja->buckets[ja->last_idx] = b;
1619                 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(b);
1620
1621                 if (ja->last_idx < ja->nr) {
1622                         if (ja->cur_idx >= ja->last_idx)
1623                                 ja->cur_idx++;
1624                         ja->last_idx++;
1625                 }
1626                 ja->nr++;
1627
1628         }
1629         spin_unlock(&j->lock);
1630
1631         BUG_ON(bch2_validate_journal_layout(ca->disk_sb.sb, ca->mi));
1632
1633         bch2_write_super(c);
1634
1635         ret = 0;
1636 err:
1637         mutex_unlock(&c->sb_lock);
1638
1639         kfree(new_bucket_seq);
1640         kfree(new_buckets);
1641         bch2_disk_reservation_put(c, &disk_res);
1642
1643         return ret;
1644 }
1645 #endif
1646
1647 int bch2_dev_journal_alloc(struct bch_dev *ca)
1648 {
1649         struct journal_device *ja = &ca->journal;
1650         struct bch_sb_field_journal *journal_buckets;
1651         unsigned i, nr;
1652         u64 b, *p;
1653
1654         if (dynamic_fault("bcachefs:add:journal_alloc"))
1655                 return -ENOMEM;
1656
1657         /*
1658          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
1659          * is smaller:
1660          */
1661         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
1662                      BCH_JOURNAL_BUCKETS_MIN,
1663                      min(1 << 10,
1664                          (1 << 20) / ca->mi.bucket_size));
1665
1666         p = krealloc(ja->bucket_seq, nr * sizeof(u64),
1667                      GFP_KERNEL|__GFP_ZERO);
1668         if (!p)
1669                 return -ENOMEM;
1670
1671         ja->bucket_seq = p;
1672
1673         p = krealloc(ja->buckets, nr * sizeof(u64),
1674                      GFP_KERNEL|__GFP_ZERO);
1675         if (!p)
1676                 return -ENOMEM;
1677
1678         ja->buckets = p;
1679
1680         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
1681                                 nr + sizeof(*journal_buckets) / sizeof(u64));
1682         if (!journal_buckets)
1683                 return -ENOMEM;
1684
1685         for (i = 0, b = ca->mi.first_bucket;
1686              i < nr && b < ca->mi.nbuckets; b++) {
1687                 if (!is_available_bucket(ca->buckets[b].mark))
1688                         continue;
1689
1690                 bch2_mark_metadata_bucket(ca, &ca->buckets[b],
1691                                          BUCKET_JOURNAL, true);
1692                 ja->buckets[i] = b;
1693                 journal_buckets->buckets[i] = cpu_to_le64(b);
1694                 i++;
1695         }
1696
1697         if (i < nr)
1698                 return -ENOSPC;
1699
1700         BUG_ON(bch2_validate_journal_layout(ca->disk_sb.sb, ca->mi));
1701
1702         ja->nr = nr;
1703
1704         return 0;
1705 }
1706
1707 /* Journalling */
1708
1709 /**
1710  * journal_reclaim_fast - do the fast part of journal reclaim
1711  *
1712  * Called from IO submission context, does not block. Cleans up after btree
1713  * write completions by advancing the journal pin and each cache's last_idx,
1714  * kicking off discards and background reclaim as necessary.
1715  */
1716 static void journal_reclaim_fast(struct journal *j)
1717 {
1718         struct journal_entry_pin_list temp;
1719         bool popped = false;
1720
1721         lockdep_assert_held(&j->lock);
1722
1723         /*
1724          * Unpin journal entries whose reference counts reached zero, meaning
1725          * all btree nodes got written out
1726          */
1727         while (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1728                 BUG_ON(!list_empty(&fifo_peek_front(&j->pin).list));
1729                 BUG_ON(!fifo_pop(&j->pin, temp));
1730                 popped = true;
1731         }
1732
1733         if (popped)
1734                 wake_up(&j->wait);
1735 }
1736
1737 /*
1738  * Journal entry pinning - machinery for holding a reference on a given journal
1739  * entry, marking it as dirty:
1740  */
1741
1742 static inline void __journal_pin_add(struct journal *j,
1743                                      struct journal_entry_pin_list *pin_list,
1744                                      struct journal_entry_pin *pin,
1745                                      journal_pin_flush_fn flush_fn)
1746 {
1747         BUG_ON(journal_pin_active(pin));
1748
1749         atomic_inc(&pin_list->count);
1750         pin->pin_list   = pin_list;
1751         pin->flush      = flush_fn;
1752
1753         if (flush_fn)
1754                 list_add(&pin->list, &pin_list->list);
1755         else
1756                 INIT_LIST_HEAD(&pin->list);
1757 }
1758
1759 static void journal_pin_add_entry(struct journal *j,
1760                                   struct journal_entry_pin_list *pin_list,
1761                                   struct journal_entry_pin *pin,
1762                                   journal_pin_flush_fn flush_fn)
1763 {
1764         spin_lock_irq(&j->pin_lock);
1765         __journal_pin_add(j, pin_list, pin, flush_fn);
1766         spin_unlock_irq(&j->pin_lock);
1767 }
1768
1769 void bch2_journal_pin_add(struct journal *j,
1770                           struct journal_res *res,
1771                           struct journal_entry_pin *pin,
1772                           journal_pin_flush_fn flush_fn)
1773 {
1774         struct journal_entry_pin_list *pin_list = res->ref
1775                 ? journal_seq_pin(j, res->seq)
1776                 : j->replay_pin_list;
1777
1778         spin_lock_irq(&j->pin_lock);
1779         __journal_pin_add(j, pin_list, pin, flush_fn);
1780         spin_unlock_irq(&j->pin_lock);
1781 }
1782
1783 static inline bool __journal_pin_drop(struct journal *j,
1784                                       struct journal_entry_pin *pin)
1785 {
1786         struct journal_entry_pin_list *pin_list = pin->pin_list;
1787
1788         pin->pin_list = NULL;
1789
1790         /* journal_reclaim_work() might have already taken us off the list */
1791         if (!list_empty_careful(&pin->list))
1792                 list_del_init(&pin->list);
1793
1794         return atomic_dec_and_test(&pin_list->count);
1795 }
1796
1797 void bch2_journal_pin_drop(struct journal *j,
1798                           struct journal_entry_pin *pin)
1799 {
1800         unsigned long flags;
1801         bool wakeup;
1802
1803         if (!journal_pin_active(pin))
1804                 return;
1805
1806         spin_lock_irqsave(&j->pin_lock, flags);
1807         wakeup = __journal_pin_drop(j, pin);
1808         spin_unlock_irqrestore(&j->pin_lock, flags);
1809
1810         /*
1811          * Unpinning a journal entry make make journal_next_bucket() succeed, if
1812          * writing a new last_seq will now make another bucket available:
1813          *
1814          * Nested irqsave is expensive, don't do the wakeup with lock held:
1815          */
1816         if (wakeup)
1817                 wake_up(&j->wait);
1818 }
1819
1820 void bch2_journal_pin_add_if_older(struct journal *j,
1821                                   struct journal_entry_pin *src_pin,
1822                                   struct journal_entry_pin *pin,
1823                                   journal_pin_flush_fn flush_fn)
1824 {
1825         spin_lock_irq(&j->pin_lock);
1826
1827         if (journal_pin_active(src_pin) &&
1828             (!journal_pin_active(pin) ||
1829              fifo_entry_idx(&j->pin, src_pin->pin_list) <
1830              fifo_entry_idx(&j->pin, pin->pin_list))) {
1831                 if (journal_pin_active(pin))
1832                         __journal_pin_drop(j, pin);
1833                 __journal_pin_add(j, src_pin->pin_list, pin, flush_fn);
1834         }
1835
1836         spin_unlock_irq(&j->pin_lock);
1837 }
1838
1839 static struct journal_entry_pin *
1840 journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
1841 {
1842         struct journal_entry_pin_list *pin_list;
1843         struct journal_entry_pin *ret = NULL;
1844         unsigned iter;
1845
1846         /* so we don't iterate over empty fifo entries below: */
1847         if (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1848                 spin_lock(&j->lock);
1849                 journal_reclaim_fast(j);
1850                 spin_unlock(&j->lock);
1851         }
1852
1853         spin_lock_irq(&j->pin_lock);
1854         fifo_for_each_entry_ptr(pin_list, &j->pin, iter) {
1855                 if (journal_pin_seq(j, pin_list) > seq_to_flush)
1856                         break;
1857
1858                 ret = list_first_entry_or_null(&pin_list->list,
1859                                 struct journal_entry_pin, list);
1860                 if (ret) {
1861                         /* must be list_del_init(), see bch2_journal_pin_drop() */
1862                         list_del_init(&ret->list);
1863                         *seq = journal_pin_seq(j, pin_list);
1864                         break;
1865                 }
1866         }
1867         spin_unlock_irq(&j->pin_lock);
1868
1869         return ret;
1870 }
1871
1872 static bool journal_has_pins(struct journal *j)
1873 {
1874         bool ret;
1875
1876         spin_lock(&j->lock);
1877         journal_reclaim_fast(j);
1878         ret = fifo_used(&j->pin) > 1 ||
1879                 atomic_read(&fifo_peek_front(&j->pin).count) > 1;
1880         spin_unlock(&j->lock);
1881
1882         return ret;
1883 }
1884
1885 void bch2_journal_flush_pins(struct journal *j)
1886 {
1887         struct journal_entry_pin *pin;
1888         u64 seq;
1889
1890         while ((pin = journal_get_next_pin(j, U64_MAX, &seq)))
1891                 pin->flush(j, pin, seq);
1892
1893         wait_event(j->wait, !journal_has_pins(j) || bch2_journal_error(j));
1894 }
1895
1896 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
1897 {
1898         bool ret;
1899
1900         spin_lock(&j->lock);
1901         ret = ja->nr &&
1902                 (ja->last_idx != ja->cur_idx &&
1903                  ja->bucket_seq[ja->last_idx] < j->last_seq_ondisk);
1904         spin_unlock(&j->lock);
1905
1906         return ret;
1907 }
1908
1909 /**
1910  * journal_reclaim_work - free up journal buckets
1911  *
1912  * Background journal reclaim writes out btree nodes. It should be run
1913  * early enough so that we never completely run out of journal buckets.
1914  *
1915  * High watermarks for triggering background reclaim:
1916  * - FIFO has fewer than 512 entries left
1917  * - fewer than 25% journal buckets free
1918  *
1919  * Background reclaim runs until low watermarks are reached:
1920  * - FIFO has more than 1024 entries left
1921  * - more than 50% journal buckets free
1922  *
1923  * As long as a reclaim can complete in the time it takes to fill up
1924  * 512 journal entries or 25% of all journal buckets, then
1925  * journal_next_bucket() should not stall.
1926  */
1927 static void journal_reclaim_work(struct work_struct *work)
1928 {
1929         struct bch_fs *c = container_of(to_delayed_work(work),
1930                                 struct bch_fs, journal.reclaim_work);
1931         struct journal *j = &c->journal;
1932         struct bch_dev *ca;
1933         struct journal_entry_pin *pin;
1934         u64 seq, seq_to_flush = 0;
1935         unsigned iter, bucket_to_flush;
1936         unsigned long next_flush;
1937         bool reclaim_lock_held = false, need_flush;
1938
1939         /*
1940          * Advance last_idx to point to the oldest journal entry containing
1941          * btree node updates that have not yet been written out
1942          */
1943         for_each_rw_member(ca, c, iter) {
1944                 struct journal_device *ja = &ca->journal;
1945
1946                 if (!ja->nr)
1947                         continue;
1948
1949                 while (should_discard_bucket(j, ja)) {
1950                         if (!reclaim_lock_held) {
1951                                 /*
1952                                  * ugh:
1953                                  * might be called from __journal_res_get()
1954                                  * under wait_event() - have to go back to
1955                                  * TASK_RUNNING before doing something that
1956                                  * would block, but only if we're doing work:
1957                                  */
1958                                 __set_current_state(TASK_RUNNING);
1959
1960                                 mutex_lock(&j->reclaim_lock);
1961                                 reclaim_lock_held = true;
1962                                 /* recheck under reclaim_lock: */
1963                                 continue;
1964                         }
1965
1966                         if (ca->mi.discard &&
1967                             blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
1968                                 blkdev_issue_discard(ca->disk_sb.bdev,
1969                                         bucket_to_sector(ca,
1970                                                 ja->buckets[ja->last_idx]),
1971                                         ca->mi.bucket_size, GFP_NOIO, 0);
1972
1973                         spin_lock(&j->lock);
1974                         ja->last_idx = (ja->last_idx + 1) % ja->nr;
1975                         spin_unlock(&j->lock);
1976
1977                         wake_up(&j->wait);
1978                 }
1979
1980                 /*
1981                  * Write out enough btree nodes to free up 50% journal
1982                  * buckets
1983                  */
1984                 spin_lock(&j->lock);
1985                 bucket_to_flush = (ja->cur_idx + (ja->nr >> 1)) % ja->nr;
1986                 seq_to_flush = max_t(u64, seq_to_flush,
1987                                      ja->bucket_seq[bucket_to_flush]);
1988                 spin_unlock(&j->lock);
1989         }
1990
1991         if (reclaim_lock_held)
1992                 mutex_unlock(&j->reclaim_lock);
1993
1994         /* Also flush if the pin fifo is more than half full */
1995         seq_to_flush = max_t(s64, seq_to_flush,
1996                              (s64) atomic64_read(&j->seq) -
1997                              (j->pin.size >> 1));
1998
1999         /*
2000          * If it's been longer than j->reclaim_delay_ms since we last flushed,
2001          * make sure to flush at least one journal pin:
2002          */
2003         next_flush = j->last_flushed + msecs_to_jiffies(j->reclaim_delay_ms);
2004         need_flush = time_after(jiffies, next_flush);
2005
2006         while ((pin = journal_get_next_pin(j, need_flush
2007                                            ? U64_MAX
2008                                            : seq_to_flush, &seq))) {
2009                 __set_current_state(TASK_RUNNING);
2010                 pin->flush(j, pin, seq);
2011                 need_flush = false;
2012
2013                 j->last_flushed = jiffies;
2014         }
2015
2016         if (!test_bit(BCH_FS_RO, &c->flags))
2017                 queue_delayed_work(system_freezable_wq, &j->reclaim_work,
2018                                    msecs_to_jiffies(j->reclaim_delay_ms));
2019 }
2020
2021 /**
2022  * journal_next_bucket - move on to the next journal bucket if possible
2023  */
2024 static int journal_write_alloc(struct journal *j, unsigned sectors)
2025 {
2026         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2027         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
2028         struct bch_extent_ptr *ptr;
2029         struct journal_device *ja;
2030         struct bch_dev *ca;
2031         bool swapped;
2032         unsigned i, replicas, replicas_want =
2033                 READ_ONCE(c->opts.metadata_replicas);
2034
2035         spin_lock(&j->lock);
2036
2037         /*
2038          * Drop any pointers to devices that have been removed, are no longer
2039          * empty, or filled up their current journal bucket:
2040          *
2041          * Note that a device may have had a small amount of free space (perhaps
2042          * one sector) that wasn't enough for the smallest possible journal
2043          * entry - that's why we drop pointers to devices <= current free space,
2044          * i.e. whichever device was limiting the current journal entry size.
2045          */
2046         extent_for_each_ptr_backwards(e, ptr) {
2047                 ca = c->devs[ptr->dev];
2048
2049                 if (ca->mi.state != BCH_MEMBER_STATE_RW ||
2050                     ca->journal.sectors_free <= sectors)
2051                         __bch2_extent_drop_ptr(e, ptr);
2052                 else
2053                         ca->journal.sectors_free -= sectors;
2054         }
2055
2056         replicas = bch2_extent_nr_ptrs(e.c);
2057
2058         spin_lock(&j->devs.lock);
2059
2060         /* Sort by tier: */
2061         do {
2062                 swapped = false;
2063
2064                 for (i = 0; i + 1 < j->devs.nr; i++)
2065                         if (j->devs.d[i + 0].dev->mi.tier >
2066                             j->devs.d[i + 1].dev->mi.tier) {
2067                                 swap(j->devs.d[i], j->devs.d[i + 1]);
2068                                 swapped = true;
2069                         }
2070         } while (swapped);
2071
2072         /*
2073          * Pick devices for next journal write:
2074          * XXX: sort devices by free journal space?
2075          */
2076         group_for_each_dev(ca, &j->devs, i) {
2077                 ja = &ca->journal;
2078
2079                 if (replicas >= replicas_want)
2080                         break;
2081
2082                 /*
2083                  * Check that we can use this device, and aren't already using
2084                  * it:
2085                  */
2086                 if (bch2_extent_has_device(e.c, ca->dev_idx) ||
2087                     !journal_dev_buckets_available(j, ca) ||
2088                     sectors > ca->mi.bucket_size)
2089                         continue;
2090
2091                 ja->sectors_free = ca->mi.bucket_size - sectors;
2092                 ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
2093                 ja->bucket_seq[ja->cur_idx] = atomic64_read(&j->seq);
2094
2095                 extent_ptr_append(bkey_i_to_extent(&j->key),
2096                         (struct bch_extent_ptr) {
2097                                   .offset = bucket_to_sector(ca,
2098                                         ja->buckets[ja->cur_idx]),
2099                                   .dev = ca->dev_idx,
2100                 });
2101                 replicas++;
2102         }
2103         spin_unlock(&j->devs.lock);
2104
2105         j->prev_buf_sectors = 0;
2106         spin_unlock(&j->lock);
2107
2108         if (replicas < c->opts.metadata_replicas_required)
2109                 return -EROFS;
2110
2111         BUG_ON(!replicas);
2112
2113         return 0;
2114 }
2115
2116 static void journal_write_compact(struct jset *jset)
2117 {
2118         struct jset_entry *i, *next, *prev = NULL;
2119
2120         /*
2121          * Simple compaction, dropping empty jset_entries (from journal
2122          * reservations that weren't fully used) and merging jset_entries that
2123          * can be.
2124          *
2125          * If we wanted to be really fancy here, we could sort all the keys in
2126          * the jset and drop keys that were overwritten - probably not worth it:
2127          */
2128         vstruct_for_each_safe(jset, i, next) {
2129                 unsigned u64s = le16_to_cpu(i->u64s);
2130
2131                 /* Empty entry: */
2132                 if (!u64s)
2133                         continue;
2134
2135                 /* Can we merge with previous entry? */
2136                 if (prev &&
2137                     i->btree_id == prev->btree_id &&
2138                     i->level    == prev->level &&
2139                     JOURNAL_ENTRY_TYPE(i) == JOURNAL_ENTRY_TYPE(prev) &&
2140                     JOURNAL_ENTRY_TYPE(i) == JOURNAL_ENTRY_BTREE_KEYS &&
2141                     le16_to_cpu(prev->u64s) + u64s <= U16_MAX) {
2142                         memmove_u64s_down(vstruct_next(prev),
2143                                           i->_data,
2144                                           u64s);
2145                         le16_add_cpu(&prev->u64s, u64s);
2146                         continue;
2147                 }
2148
2149                 /* Couldn't merge, move i into new position (after prev): */
2150                 prev = prev ? vstruct_next(prev) : jset->start;
2151                 if (i != prev)
2152                         memmove_u64s_down(prev, i, jset_u64s(u64s));
2153         }
2154
2155         prev = prev ? vstruct_next(prev) : jset->start;
2156         jset->u64s = cpu_to_le32((u64 *) prev - jset->_data);
2157 }
2158
2159 static void journal_write_endio(struct bio *bio)
2160 {
2161         struct bch_dev *ca = bio->bi_private;
2162         struct journal *j = &ca->fs->journal;
2163
2164         if (bch2_dev_fatal_io_err_on(bio->bi_error, ca, "journal write") ||
2165             bch2_meta_write_fault("journal"))
2166                 bch2_journal_halt(j);
2167
2168         closure_put(&j->io);
2169         percpu_ref_put(&ca->io_ref);
2170 }
2171
2172 static void journal_write_done(struct closure *cl)
2173 {
2174         struct journal *j = container_of(cl, struct journal, io);
2175         struct journal_buf *w = journal_prev_buf(j);
2176
2177         j->last_seq_ondisk = le64_to_cpu(w->data->last_seq);
2178
2179         __bch2_time_stats_update(j->write_time, j->write_start_time);
2180
2181         BUG_ON(!j->reservations.prev_buf_unwritten);
2182         atomic64_sub(((union journal_res_state) { .prev_buf_unwritten = 1 }).v,
2183                      &j->reservations.counter);
2184
2185         /*
2186          * XXX: this is racy, we could technically end up doing the wake up
2187          * after the journal_buf struct has been reused for the next write
2188          * (because we're clearing JOURNAL_IO_IN_FLIGHT) and wake up things that
2189          * are waiting on the _next_ write, not this one.
2190          *
2191          * The wake up can't come before, because journal_flush_seq_async() is
2192          * looking at JOURNAL_IO_IN_FLIGHT when it has to wait on a journal
2193          * write that was already in flight.
2194          *
2195          * The right fix is to use a lock here, but using j.lock here means it
2196          * has to be a spin_lock_irqsave() lock which then requires propagating
2197          * the irq()ness to other locks and it's all kinds of nastiness.
2198          */
2199
2200         closure_wake_up(&w->wait);
2201         wake_up(&j->wait);
2202
2203         /*
2204          * Updating last_seq_ondisk may let journal_reclaim_work() discard more
2205          * buckets:
2206          */
2207         mod_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
2208 }
2209
2210 static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
2211 {
2212         /* we aren't holding j->lock: */
2213         unsigned new_size = READ_ONCE(j->buf_size_want);
2214         void *new_buf;
2215
2216         if (buf->size >= new_size)
2217                 return;
2218
2219         new_buf = kvpmalloc(new_size, GFP_NOIO|__GFP_NOWARN);
2220         if (!new_buf)
2221                 return;
2222
2223         memcpy(new_buf, buf->data, buf->size);
2224         kvpfree(buf->data, buf->size);
2225         buf->data       = new_buf;
2226         buf->size       = new_size;
2227 }
2228
2229 static void journal_write(struct closure *cl)
2230 {
2231         struct journal *j = container_of(cl, struct journal, io);
2232         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2233         struct bch_dev *ca;
2234         struct journal_buf *w = journal_prev_buf(j);
2235         struct jset *jset;
2236         struct bio *bio;
2237         struct bch_extent_ptr *ptr;
2238         unsigned i, sectors, bytes;
2239
2240         journal_buf_realloc(j, w);
2241         jset = w->data;
2242
2243         j->write_start_time = local_clock();
2244
2245         bch2_journal_add_prios(j, w);
2246
2247         mutex_lock(&c->btree_root_lock);
2248         for (i = 0; i < BTREE_ID_NR; i++) {
2249                 struct btree_root *r = &c->btree_roots[i];
2250
2251                 if (r->alive)
2252                         bch2_journal_add_btree_root(w, i, &r->key, r->level);
2253         }
2254         mutex_unlock(&c->btree_root_lock);
2255
2256         journal_write_compact(jset);
2257
2258         jset->read_clock        = cpu_to_le16(c->prio_clock[READ].hand);
2259         jset->write_clock       = cpu_to_le16(c->prio_clock[WRITE].hand);
2260         jset->magic             = cpu_to_le64(jset_magic(c));
2261         jset->version           = cpu_to_le32(BCACHE_JSET_VERSION);
2262
2263         SET_JSET_BIG_ENDIAN(jset, CPU_BIG_ENDIAN);
2264         SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
2265
2266         bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
2267                     jset->encrypted_start,
2268                     vstruct_end(jset) - (void *) jset->encrypted_start);
2269
2270         jset->csum = csum_vstruct(c, JSET_CSUM_TYPE(jset),
2271                                   journal_nonce(jset), jset);
2272
2273         sectors = vstruct_sectors(jset, c->block_bits);
2274         BUG_ON(sectors > j->prev_buf_sectors);
2275
2276         bytes = vstruct_bytes(w->data);
2277         memset((void *) w->data + bytes, 0, (sectors << 9) - bytes);
2278
2279         if (journal_write_alloc(j, sectors)) {
2280                 bch2_journal_halt(j);
2281                 bch_err(c, "Unable to allocate journal write");
2282                 bch2_fatal_error(c);
2283                 closure_return_with_destructor(cl, journal_write_done);
2284         }
2285
2286         bch2_check_mark_super(c, &j->key, true);
2287
2288         /*
2289          * XXX: we really should just disable the entire journal in nochanges
2290          * mode
2291          */
2292         if (c->opts.nochanges)
2293                 goto no_io;
2294
2295         extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr) {
2296                 ca = c->devs[ptr->dev];
2297                 if (!percpu_ref_tryget(&ca->io_ref)) {
2298                         /* XXX: fix this */
2299                         bch_err(c, "missing device for journal write\n");
2300                         continue;
2301                 }
2302
2303                 atomic64_add(sectors, &ca->meta_sectors_written);
2304
2305                 bio = ca->journal.bio;
2306                 bio_reset(bio);
2307                 bio->bi_iter.bi_sector  = ptr->offset;
2308                 bio->bi_bdev            = ca->disk_sb.bdev;
2309                 bio->bi_iter.bi_size    = sectors << 9;
2310                 bio->bi_end_io          = journal_write_endio;
2311                 bio->bi_private         = ca;
2312                 bio_set_op_attrs(bio, REQ_OP_WRITE,
2313                                  REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
2314                 bch2_bio_map(bio, jset);
2315
2316                 trace_journal_write(bio);
2317                 closure_bio_submit(bio, cl);
2318
2319                 ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(w->data->seq);
2320         }
2321
2322         for_each_rw_member(ca, c, i)
2323                 if (journal_flushes_device(ca) &&
2324                     !bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key), i)) {
2325                         percpu_ref_get(&ca->io_ref);
2326
2327                         bio = ca->journal.bio;
2328                         bio_reset(bio);
2329                         bio->bi_bdev            = ca->disk_sb.bdev;
2330                         bio->bi_end_io          = journal_write_endio;
2331                         bio->bi_private         = ca;
2332                         bio_set_op_attrs(bio, REQ_OP_WRITE, WRITE_FLUSH);
2333                         closure_bio_submit(bio, cl);
2334                 }
2335
2336 no_io:
2337         extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr)
2338                 ptr->offset += sectors;
2339
2340         closure_return_with_destructor(cl, journal_write_done);
2341 }
2342
2343 static void journal_write_work(struct work_struct *work)
2344 {
2345         struct journal *j = container_of(to_delayed_work(work),
2346                                          struct journal, write_work);
2347         spin_lock(&j->lock);
2348         set_bit(JOURNAL_NEED_WRITE, &j->flags);
2349
2350         if (journal_buf_switch(j, false) != JOURNAL_UNLOCKED)
2351                 spin_unlock(&j->lock);
2352 }
2353
2354 /*
2355  * Given an inode number, if that inode number has data in the journal that
2356  * hasn't yet been flushed, return the journal sequence number that needs to be
2357  * flushed:
2358  */
2359 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
2360 {
2361         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
2362         u64 seq = 0;
2363
2364         if (!test_bit(h, j->buf[0].has_inode) &&
2365             !test_bit(h, j->buf[1].has_inode))
2366                 return 0;
2367
2368         spin_lock(&j->lock);
2369         if (test_bit(h, journal_cur_buf(j)->has_inode))
2370                 seq = atomic64_read(&j->seq);
2371         else if (test_bit(h, journal_prev_buf(j)->has_inode))
2372                 seq = atomic64_read(&j->seq) - 1;
2373         spin_unlock(&j->lock);
2374
2375         return seq;
2376 }
2377
2378 static int __journal_res_get(struct journal *j, struct journal_res *res,
2379                               unsigned u64s_min, unsigned u64s_max)
2380 {
2381         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2382         struct journal_buf *buf;
2383         int ret;
2384 retry:
2385         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2386         if (ret)
2387                 return ret;
2388
2389         spin_lock(&j->lock);
2390         /*
2391          * Recheck after taking the lock, so we don't race with another thread
2392          * that just did journal_entry_open() and call journal_entry_close()
2393          * unnecessarily
2394          */
2395         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2396         if (ret) {
2397                 spin_unlock(&j->lock);
2398                 return 1;
2399         }
2400
2401         /*
2402          * If we couldn't get a reservation because the current buf filled up,
2403          * and we had room for a bigger entry on disk, signal that we want to
2404          * realloc the journal bufs:
2405          */
2406         buf = journal_cur_buf(j);
2407         if (journal_entry_is_open(j) &&
2408             buf->size >> 9 < buf->disk_sectors &&
2409             buf->size < JOURNAL_ENTRY_SIZE_MAX)
2410                 j->buf_size_want = max(j->buf_size_want, buf->size << 1);
2411
2412         /*
2413          * Close the current journal entry if necessary, then try to start a new
2414          * one:
2415          */
2416         switch (journal_buf_switch(j, false)) {
2417         case JOURNAL_ENTRY_ERROR:
2418                 spin_unlock(&j->lock);
2419                 return -EROFS;
2420         case JOURNAL_ENTRY_INUSE:
2421                 /* haven't finished writing out the previous one: */
2422                 spin_unlock(&j->lock);
2423                 trace_journal_entry_full(c);
2424                 goto blocked;
2425         case JOURNAL_ENTRY_CLOSED:
2426                 break;
2427         case JOURNAL_UNLOCKED:
2428                 goto retry;
2429         }
2430
2431         /* We now have a new, closed journal buf - see if we can open it: */
2432         ret = journal_entry_open(j);
2433         spin_unlock(&j->lock);
2434
2435         if (ret < 0)
2436                 return ret;
2437         if (ret)
2438                 goto retry;
2439
2440         /* Journal's full, we have to wait */
2441
2442         /*
2443          * Direct reclaim - can't rely on reclaim from work item
2444          * due to freezing..
2445          */
2446         journal_reclaim_work(&j->reclaim_work.work);
2447
2448         trace_journal_full(c);
2449 blocked:
2450         if (!j->res_get_blocked_start)
2451                 j->res_get_blocked_start = local_clock() ?: 1;
2452         return 0;
2453 }
2454
2455 /*
2456  * Essentially the entry function to the journaling code. When bcachefs is doing
2457  * a btree insert, it calls this function to get the current journal write.
2458  * Journal write is the structure used set up journal writes. The calling
2459  * function will then add its keys to the structure, queuing them for the next
2460  * write.
2461  *
2462  * To ensure forward progress, the current task must not be holding any
2463  * btree node write locks.
2464  */
2465 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
2466                                  unsigned u64s_min, unsigned u64s_max)
2467 {
2468         int ret;
2469
2470         wait_event(j->wait,
2471                    (ret = __journal_res_get(j, res, u64s_min,
2472                                             u64s_max)));
2473         return ret < 0 ? ret : 0;
2474 }
2475
2476 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
2477 {
2478         spin_lock(&j->lock);
2479
2480         BUG_ON(seq > atomic64_read(&j->seq));
2481
2482         if (bch2_journal_error(j)) {
2483                 spin_unlock(&j->lock);
2484                 return;
2485         }
2486
2487         if (seq == atomic64_read(&j->seq)) {
2488                 if (!closure_wait(&journal_cur_buf(j)->wait, parent))
2489                         BUG();
2490         } else if (seq + 1 == atomic64_read(&j->seq) &&
2491                    j->reservations.prev_buf_unwritten) {
2492                 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
2493                         BUG();
2494
2495                 smp_mb();
2496
2497                 /* check if raced with write completion (or failure) */
2498                 if (!j->reservations.prev_buf_unwritten ||
2499                     bch2_journal_error(j))
2500                         closure_wake_up(&journal_prev_buf(j)->wait);
2501         }
2502
2503         spin_unlock(&j->lock);
2504 }
2505
2506 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
2507 {
2508         spin_lock(&j->lock);
2509
2510         BUG_ON(seq > atomic64_read(&j->seq));
2511
2512         if (bch2_journal_error(j)) {
2513                 spin_unlock(&j->lock);
2514                 return;
2515         }
2516
2517         if (seq == atomic64_read(&j->seq)) {
2518                 bool set_need_write = false;
2519
2520                 if (parent &&
2521                     !closure_wait(&journal_cur_buf(j)->wait, parent))
2522                         BUG();
2523
2524                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2525                         j->need_write_time = local_clock();
2526                         set_need_write = true;
2527                 }
2528
2529                 switch (journal_buf_switch(j, set_need_write)) {
2530                 case JOURNAL_ENTRY_ERROR:
2531                         if (parent)
2532                                 closure_wake_up(&journal_cur_buf(j)->wait);
2533                         break;
2534                 case JOURNAL_ENTRY_CLOSED:
2535                         /*
2536                          * Journal entry hasn't been opened yet, but caller
2537                          * claims it has something (seq == j->seq):
2538                          */
2539                         BUG();
2540                 case JOURNAL_ENTRY_INUSE:
2541                         break;
2542                 case JOURNAL_UNLOCKED:
2543                         return;
2544                 }
2545         } else if (parent &&
2546                    seq + 1 == atomic64_read(&j->seq) &&
2547                    j->reservations.prev_buf_unwritten) {
2548                 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
2549                         BUG();
2550
2551                 smp_mb();
2552
2553                 /* check if raced with write completion (or failure) */
2554                 if (!j->reservations.prev_buf_unwritten ||
2555                     bch2_journal_error(j))
2556                         closure_wake_up(&journal_prev_buf(j)->wait);
2557         }
2558
2559         spin_unlock(&j->lock);
2560 }
2561
2562 int bch2_journal_flush_seq(struct journal *j, u64 seq)
2563 {
2564         struct closure cl;
2565         u64 start_time = local_clock();
2566
2567         closure_init_stack(&cl);
2568         bch2_journal_flush_seq_async(j, seq, &cl);
2569         closure_sync(&cl);
2570
2571         bch2_time_stats_update(j->flush_seq_time, start_time);
2572
2573         return bch2_journal_error(j);
2574 }
2575
2576 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
2577 {
2578         struct journal_res res;
2579         unsigned u64s = jset_u64s(0);
2580
2581         memset(&res, 0, sizeof(res));
2582
2583         bch2_journal_res_get(j, &res, u64s, u64s);
2584         bch2_journal_res_put(j, &res);
2585
2586         bch2_journal_flush_seq_async(j, res.seq, parent);
2587 }
2588
2589 int bch2_journal_meta(struct journal *j)
2590 {
2591         struct journal_res res;
2592         unsigned u64s = jset_u64s(0);
2593         int ret;
2594
2595         memset(&res, 0, sizeof(res));
2596
2597         ret = bch2_journal_res_get(j, &res, u64s, u64s);
2598         if (ret)
2599                 return ret;
2600
2601         bch2_journal_res_put(j, &res);
2602
2603         return bch2_journal_flush_seq(j, res.seq);
2604 }
2605
2606 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
2607 {
2608         u64 seq, journal_seq;
2609
2610         spin_lock(&j->lock);
2611         journal_seq = atomic64_read(&j->seq);
2612
2613         if (journal_entry_is_open(j)) {
2614                 seq = journal_seq;
2615         } else if (journal_seq) {
2616                 seq = journal_seq - 1;
2617         } else {
2618                 spin_unlock(&j->lock);
2619                 return;
2620         }
2621         spin_unlock(&j->lock);
2622
2623         bch2_journal_flush_seq_async(j, seq, parent);
2624 }
2625
2626 int bch2_journal_flush(struct journal *j)
2627 {
2628         u64 seq, journal_seq;
2629
2630         spin_lock(&j->lock);
2631         journal_seq = atomic64_read(&j->seq);
2632
2633         if (journal_entry_is_open(j)) {
2634                 seq = journal_seq;
2635         } else if (journal_seq) {
2636                 seq = journal_seq - 1;
2637         } else {
2638                 spin_unlock(&j->lock);
2639                 return 0;
2640         }
2641         spin_unlock(&j->lock);
2642
2643         return bch2_journal_flush_seq(j, seq);
2644 }
2645
2646 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
2647 {
2648         union journal_res_state *s = &j->reservations;
2649         struct bch_dev *ca;
2650         unsigned iter;
2651         ssize_t ret = 0;
2652
2653         rcu_read_lock();
2654         spin_lock(&j->lock);
2655
2656         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2657                          "active journal entries:\t%zu\n"
2658                          "seq:\t\t\t%llu\n"
2659                          "last_seq:\t\t%llu\n"
2660                          "last_seq_ondisk:\t%llu\n"
2661                          "reservation count:\t%u\n"
2662                          "reservation offset:\t%u\n"
2663                          "current entry u64s:\t%u\n"
2664                          "io in flight:\t\t%i\n"
2665                          "need write:\t\t%i\n"
2666                          "dirty:\t\t\t%i\n"
2667                          "replay done:\t\t%i\n",
2668                          fifo_used(&j->pin),
2669                          (u64) atomic64_read(&j->seq),
2670                          last_seq(j),
2671                          j->last_seq_ondisk,
2672                          journal_state_count(*s, s->idx),
2673                          s->cur_entry_offset,
2674                          j->cur_entry_u64s,
2675                          s->prev_buf_unwritten,
2676                          test_bit(JOURNAL_NEED_WRITE,   &j->flags),
2677                          journal_entry_is_open(j),
2678                          test_bit(JOURNAL_REPLAY_DONE,  &j->flags));
2679
2680         spin_lock(&j->devs.lock);
2681         group_for_each_dev(ca, &j->devs, iter) {
2682                 struct journal_device *ja = &ca->journal;
2683
2684                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
2685                                  "dev %u:\n"
2686                                  "\tnr\t\t%u\n"
2687                                  "\tcur_idx\t\t%u (seq %llu)\n"
2688                                  "\tlast_idx\t%u (seq %llu)\n",
2689                                  iter, ja->nr,
2690                                  ja->cur_idx,   ja->bucket_seq[ja->cur_idx],
2691                                  ja->last_idx,  ja->bucket_seq[ja->last_idx]);
2692         }
2693         spin_unlock(&j->devs.lock);
2694
2695         spin_unlock(&j->lock);
2696         rcu_read_unlock();
2697
2698         return ret;
2699 }
2700
2701 static bool bch2_journal_writing_to_device(struct bch_dev *ca)
2702 {
2703         struct journal *j = &ca->fs->journal;
2704         bool ret;
2705
2706         spin_lock(&j->lock);
2707         ret = bch2_extent_has_device(bkey_i_to_s_c_extent(&j->key),
2708                                     ca->dev_idx);
2709         spin_unlock(&j->lock);
2710
2711         return ret;
2712 }
2713
2714 /*
2715  * This asumes that ca has already been marked read-only so that
2716  * journal_next_bucket won't pick buckets out of ca any more.
2717  * Hence, if the journal is not currently pointing to ca, there
2718  * will be no new writes to journal entries in ca after all the
2719  * pending ones have been flushed to disk.
2720  *
2721  * If the journal is being written to ca, write a new record, and
2722  * journal_next_bucket will notice that the device is no longer
2723  * writeable and pick a new set of devices to write to.
2724  */
2725
2726 int bch2_journal_move(struct bch_dev *ca)
2727 {
2728         u64 last_flushed_seq;
2729         struct journal_device *ja = &ca->journal;
2730         struct bch_fs *c = ca->fs;
2731         struct journal *j = &c->journal;
2732         unsigned i;
2733         int ret = 0;            /* Success */
2734
2735         if (bch2_journal_writing_to_device(ca)) {
2736                 /*
2737                  * bch_journal_meta will write a record and we'll wait
2738                  * for the write to complete.
2739                  * Actually writing the journal (journal_write_locked)
2740                  * will call journal_next_bucket which notices that the
2741                  * device is no longer writeable, and picks a new one.
2742                  */
2743                 bch2_journal_meta(j);
2744                 BUG_ON(bch2_journal_writing_to_device(ca));
2745         }
2746
2747         /*
2748          * Flush all btree updates to backing store so that any
2749          * journal entries written to ca become stale and are no
2750          * longer needed.
2751          */
2752
2753         /*
2754          * XXX: switch to normal journal reclaim machinery
2755          */
2756         bch2_btree_flush(c);
2757
2758         /*
2759          * Force a meta-data journal entry to be written so that
2760          * we have newer journal entries in devices other than ca,
2761          * and wait for the meta data write to complete.
2762          */
2763         bch2_journal_meta(j);
2764
2765         /*
2766          * Verify that we no longer need any of the journal entries in
2767          * the device
2768          */
2769         spin_lock(&j->lock);
2770         last_flushed_seq = last_seq(j);
2771         spin_unlock(&j->lock);
2772
2773         for (i = 0; i < ja->nr; i += 1)
2774                 BUG_ON(ja->bucket_seq[i] > last_flushed_seq);
2775
2776         return ret;
2777 }
2778
2779 void bch2_fs_journal_stop(struct journal *j)
2780 {
2781         if (!test_bit(JOURNAL_STARTED, &j->flags))
2782                 return;
2783
2784         /*
2785          * Empty out the journal by first flushing everything pinning existing
2786          * journal entries, then force a brand new empty journal entry to be
2787          * written:
2788          */
2789         bch2_journal_flush_pins(j);
2790         bch2_journal_flush_async(j, NULL);
2791         bch2_journal_meta(j);
2792
2793         cancel_delayed_work_sync(&j->write_work);
2794         cancel_delayed_work_sync(&j->reclaim_work);
2795 }
2796
2797 void bch2_dev_journal_exit(struct bch_dev *ca)
2798 {
2799         kfree(ca->journal.bio);
2800         kfree(ca->journal.buckets);
2801         kfree(ca->journal.bucket_seq);
2802
2803         ca->journal.bio         = NULL;
2804         ca->journal.buckets     = NULL;
2805         ca->journal.bucket_seq  = NULL;
2806 }
2807
2808 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
2809 {
2810         struct journal_device *ja = &ca->journal;
2811         struct bch_sb_field_journal *journal_buckets =
2812                 bch2_sb_get_journal(sb);
2813         unsigned i;
2814
2815         ja->nr = bch2_nr_journal_buckets(journal_buckets);
2816
2817         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2818         if (!ja->bucket_seq)
2819                 return -ENOMEM;
2820
2821         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
2822                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
2823         if (!ca->journal.bio)
2824                 return -ENOMEM;
2825
2826         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2827         if (!ja->buckets)
2828                 return -ENOMEM;
2829
2830         for (i = 0; i < ja->nr; i++)
2831                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
2832
2833         return 0;
2834 }
2835
2836 void bch2_fs_journal_exit(struct journal *j)
2837 {
2838         kvpfree(j->buf[1].data, j->buf[1].size);
2839         kvpfree(j->buf[0].data, j->buf[0].size);
2840         free_fifo(&j->pin);
2841 }
2842
2843 int bch2_fs_journal_init(struct journal *j)
2844 {
2845         static struct lock_class_key res_key;
2846
2847         spin_lock_init(&j->lock);
2848         spin_lock_init(&j->pin_lock);
2849         init_waitqueue_head(&j->wait);
2850         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
2851         INIT_DELAYED_WORK(&j->reclaim_work, journal_reclaim_work);
2852         mutex_init(&j->blacklist_lock);
2853         INIT_LIST_HEAD(&j->seq_blacklist);
2854         spin_lock_init(&j->devs.lock);
2855         mutex_init(&j->reclaim_lock);
2856
2857         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
2858
2859         j->buf[0].size          = JOURNAL_ENTRY_SIZE_MIN;
2860         j->buf[1].size          = JOURNAL_ENTRY_SIZE_MIN;
2861         j->write_delay_ms       = 100;
2862         j->reclaim_delay_ms     = 100;
2863
2864         bkey_extent_init(&j->key);
2865
2866         atomic64_set(&j->reservations.counter,
2867                 ((union journal_res_state)
2868                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
2869
2870         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
2871             !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
2872             !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL)))
2873                 return -ENOMEM;
2874
2875         j->pin.front = j->pin.back = 1;
2876
2877         return 0;
2878 }