]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
Update bcachefs sources to edf5f38218 bcachefs: Refactor superblock code
[bcachefs-tools-debian] / libbcachefs / journal.c
1 /*
2  * bcachefs journalling code, for btree insertions
3  *
4  * Copyright 2012 Google, Inc.
5  */
6
7 #include "bcachefs.h"
8 #include "alloc.h"
9 #include "bkey_methods.h"
10 #include "buckets.h"
11 #include "btree_gc.h"
12 #include "btree_update.h"
13 #include "btree_update_interior.h"
14 #include "btree_io.h"
15 #include "checksum.h"
16 #include "debug.h"
17 #include "error.h"
18 #include "extents.h"
19 #include "io.h"
20 #include "keylist.h"
21 #include "journal.h"
22 #include "replicas.h"
23 #include "super-io.h"
24 #include "vstructs.h"
25
26 #include <trace/events/bcachefs.h>
27
28 static void journal_write(struct closure *);
29 static void journal_reclaim_fast(struct journal *);
30 static void journal_pin_add_entry(struct journal *,
31                                   struct journal_entry_pin_list *,
32                                   struct journal_entry_pin *,
33                                   journal_pin_flush_fn);
34
35 static inline void journal_wake(struct journal *j)
36 {
37         wake_up(&j->wait);
38         closure_wake_up(&j->async_wait);
39 }
40
41 static inline struct journal_buf *journal_cur_buf(struct journal *j)
42 {
43         return j->buf + j->reservations.idx;
44 }
45
46 static inline struct journal_buf *journal_prev_buf(struct journal *j)
47 {
48         return j->buf + !j->reservations.idx;
49 }
50
51 /* Sequence number of oldest dirty journal entry */
52
53 static inline u64 journal_last_seq(struct journal *j)
54 {
55         return j->pin.front;
56 }
57
58 static inline u64 journal_cur_seq(struct journal *j)
59 {
60         BUG_ON(j->pin.back - 1 != atomic64_read(&j->seq));
61
62         return j->pin.back - 1;
63 }
64
65 static inline u64 journal_pin_seq(struct journal *j,
66                                   struct journal_entry_pin_list *pin_list)
67 {
68         return fifo_entry_idx_abs(&j->pin, pin_list);
69 }
70
71 u64 bch2_journal_pin_seq(struct journal *j, struct journal_entry_pin *pin)
72 {
73         u64 ret = 0;
74
75         spin_lock(&j->lock);
76         if (journal_pin_active(pin))
77                 ret = journal_pin_seq(j, pin->pin_list);
78         spin_unlock(&j->lock);
79
80         return ret;
81 }
82
83 static inline void bch2_journal_add_entry_noreservation(struct journal_buf *buf,
84                                  unsigned type, enum btree_id id,
85                                  unsigned level,
86                                  const void *data, size_t u64s)
87 {
88         struct jset *jset = buf->data;
89
90         bch2_journal_add_entry_at(buf, le32_to_cpu(jset->u64s),
91                                   type, id, level, data, u64s);
92         le32_add_cpu(&jset->u64s, jset_u64s(u64s));
93 }
94
95 static struct jset_entry *bch2_journal_find_entry(struct jset *j, unsigned type,
96                                                  enum btree_id id)
97 {
98         struct jset_entry *entry;
99
100         for_each_jset_entry_type(entry, j, type)
101                 if (entry->btree_id == id)
102                         return entry;
103
104         return NULL;
105 }
106
107 struct bkey_i *bch2_journal_find_btree_root(struct bch_fs *c, struct jset *j,
108                                            enum btree_id id, unsigned *level)
109 {
110         struct bkey_i *k;
111         struct jset_entry *entry =
112                 bch2_journal_find_entry(j, JOURNAL_ENTRY_BTREE_ROOT, id);
113
114         if (!entry)
115                 return NULL;
116
117         if (!entry->u64s)
118                 return ERR_PTR(-EINVAL);
119
120         k = entry->start;
121         *level = entry->level;
122         *level = entry->level;
123         return k;
124 }
125
126 static void bch2_journal_add_btree_root(struct journal_buf *buf,
127                                        enum btree_id id, struct bkey_i *k,
128                                        unsigned level)
129 {
130         bch2_journal_add_entry_noreservation(buf,
131                               JOURNAL_ENTRY_BTREE_ROOT, id, level,
132                               k, k->k.u64s);
133 }
134
135 static void journal_seq_blacklist_flush(struct journal *j,
136                                 struct journal_entry_pin *pin, u64 seq)
137 {
138         struct bch_fs *c =
139                 container_of(j, struct bch_fs, journal);
140         struct journal_seq_blacklist *bl =
141                 container_of(pin, struct journal_seq_blacklist, pin);
142         struct blacklisted_node n;
143         struct closure cl;
144         unsigned i;
145         int ret;
146
147         closure_init_stack(&cl);
148
149         for (i = 0;; i++) {
150                 struct btree_iter iter;
151                 struct btree *b;
152
153                 mutex_lock(&j->blacklist_lock);
154                 if (i >= bl->nr_entries) {
155                         mutex_unlock(&j->blacklist_lock);
156                         break;
157                 }
158                 n = bl->entries[i];
159                 mutex_unlock(&j->blacklist_lock);
160
161                 __bch2_btree_iter_init(&iter, c, n.btree_id, n.pos, 0, 0, 0);
162
163                 b = bch2_btree_iter_peek_node(&iter);
164
165                 /* The node might have already been rewritten: */
166
167                 if (b->data->keys.seq == n.seq) {
168                         ret = bch2_btree_node_rewrite(c, &iter, n.seq, 0);
169                         if (ret) {
170                                 bch2_btree_iter_unlock(&iter);
171                                 bch2_fs_fatal_error(c,
172                                         "error %i rewriting btree node with blacklisted journal seq",
173                                         ret);
174                                 bch2_journal_halt(j);
175                                 return;
176                         }
177                 }
178
179                 bch2_btree_iter_unlock(&iter);
180         }
181
182         for (i = 0;; i++) {
183                 struct btree_update *as;
184                 struct pending_btree_node_free *d;
185
186                 mutex_lock(&j->blacklist_lock);
187                 if (i >= bl->nr_entries) {
188                         mutex_unlock(&j->blacklist_lock);
189                         break;
190                 }
191                 n = bl->entries[i];
192                 mutex_unlock(&j->blacklist_lock);
193 redo_wait:
194                 mutex_lock(&c->btree_interior_update_lock);
195
196                 /*
197                  * Is the node on the list of pending interior node updates -
198                  * being freed? If so, wait for that to finish:
199                  */
200                 for_each_pending_btree_node_free(c, as, d)
201                         if (n.seq       == d->seq &&
202                             n.btree_id  == d->btree_id &&
203                             !d->level &&
204                             !bkey_cmp(n.pos, d->key.k.p)) {
205                                 closure_wait(&as->wait, &cl);
206                                 mutex_unlock(&c->btree_interior_update_lock);
207                                 closure_sync(&cl);
208                                 goto redo_wait;
209                         }
210
211                 mutex_unlock(&c->btree_interior_update_lock);
212         }
213
214         mutex_lock(&j->blacklist_lock);
215
216         bch2_journal_pin_drop(j, &bl->pin);
217         list_del(&bl->list);
218         kfree(bl->entries);
219         kfree(bl);
220
221         mutex_unlock(&j->blacklist_lock);
222 }
223
224 static struct journal_seq_blacklist *
225 journal_seq_blacklist_find(struct journal *j, u64 seq)
226 {
227         struct journal_seq_blacklist *bl;
228
229         lockdep_assert_held(&j->blacklist_lock);
230
231         list_for_each_entry(bl, &j->seq_blacklist, list)
232                 if (seq == bl->seq)
233                         return bl;
234
235         return NULL;
236 }
237
238 static struct journal_seq_blacklist *
239 bch2_journal_seq_blacklisted_new(struct journal *j, u64 seq)
240 {
241         struct journal_seq_blacklist *bl;
242
243         lockdep_assert_held(&j->blacklist_lock);
244
245         /*
246          * When we start the journal, bch2_journal_start() will skip over @seq:
247          */
248
249         bl = kzalloc(sizeof(*bl), GFP_KERNEL);
250         if (!bl)
251                 return NULL;
252
253         bl->seq = seq;
254         list_add_tail(&bl->list, &j->seq_blacklist);
255         return bl;
256 }
257
258 /*
259  * Returns true if @seq is newer than the most recent journal entry that got
260  * written, and data corresponding to @seq should be ignored - also marks @seq
261  * as blacklisted so that on future restarts the corresponding data will still
262  * be ignored:
263  */
264 int bch2_journal_seq_should_ignore(struct bch_fs *c, u64 seq, struct btree *b)
265 {
266         struct journal *j = &c->journal;
267         struct journal_seq_blacklist *bl = NULL;
268         struct blacklisted_node *n;
269         u64 journal_seq, i;
270         int ret = 0;
271
272         if (!seq)
273                 return 0;
274
275         spin_lock(&j->lock);
276         journal_seq = journal_cur_seq(j);
277         spin_unlock(&j->lock);
278
279         /* Interier updates aren't journalled: */
280         BUG_ON(b->level);
281         BUG_ON(seq > journal_seq && test_bit(BCH_FS_INITIAL_GC_DONE, &c->flags));
282
283         /*
284          * Decrease this back to j->seq + 2 when we next rev the on disk format:
285          * increasing it temporarily to work around bug in old kernels
286          */
287         bch2_fs_inconsistent_on(seq > journal_seq + 4, c,
288                          "bset journal seq too far in the future: %llu > %llu",
289                          seq, journal_seq);
290
291         if (seq <= journal_seq &&
292             list_empty_careful(&j->seq_blacklist))
293                 return 0;
294
295         mutex_lock(&j->blacklist_lock);
296
297         if (seq <= journal_seq) {
298                 bl = journal_seq_blacklist_find(j, seq);
299                 if (!bl)
300                         goto out;
301         } else {
302                 bch_verbose(c, "btree node %u:%llu:%llu has future journal sequence number %llu, blacklisting",
303                             b->btree_id, b->key.k.p.inode, b->key.k.p.offset, seq);
304
305                 for (i = journal_seq + 1; i <= seq; i++) {
306                         bl = journal_seq_blacklist_find(j, i) ?:
307                                 bch2_journal_seq_blacklisted_new(j, i);
308                         if (!bl) {
309                                 ret = -ENOMEM;
310                                 goto out;
311                         }
312                 }
313         }
314
315         for (n = bl->entries; n < bl->entries + bl->nr_entries; n++)
316                 if (b->data->keys.seq   == n->seq &&
317                     b->btree_id         == n->btree_id &&
318                     !bkey_cmp(b->key.k.p, n->pos))
319                         goto found_entry;
320
321         if (!bl->nr_entries ||
322             is_power_of_2(bl->nr_entries)) {
323                 n = krealloc(bl->entries,
324                              max(bl->nr_entries * 2, 8UL) * sizeof(*n),
325                              GFP_KERNEL);
326                 if (!n) {
327                         ret = -ENOMEM;
328                         goto out;
329                 }
330                 bl->entries = n;
331         }
332
333         bl->entries[bl->nr_entries++] = (struct blacklisted_node) {
334                 .seq            = b->data->keys.seq,
335                 .btree_id       = b->btree_id,
336                 .pos            = b->key.k.p,
337         };
338 found_entry:
339         ret = 1;
340 out:
341         mutex_unlock(&j->blacklist_lock);
342         return ret;
343 }
344
345 /*
346  * Journal replay/recovery:
347  *
348  * This code is all driven from bch2_fs_start(); we first read the journal
349  * entries, do some other stuff, then we mark all the keys in the journal
350  * entries (same as garbage collection would), then we replay them - reinserting
351  * them into the cache in precisely the same order as they appear in the
352  * journal.
353  *
354  * We only journal keys that go in leaf nodes, which simplifies things quite a
355  * bit.
356  */
357
358 struct journal_list {
359         struct closure          cl;
360         struct mutex            lock;
361         struct list_head        *head;
362         int                     ret;
363 };
364
365 #define JOURNAL_ENTRY_ADD_OK            0
366 #define JOURNAL_ENTRY_ADD_OUT_OF_RANGE  5
367
368 /*
369  * Given a journal entry we just read, add it to the list of journal entries to
370  * be replayed:
371  */
372 static int journal_entry_add(struct bch_fs *c, struct bch_dev *ca,
373                              struct journal_list *jlist, struct jset *j)
374 {
375         struct journal_replay *i, *pos;
376         struct list_head *where;
377         size_t bytes = vstruct_bytes(j);
378         __le64 last_seq;
379         int ret;
380
381         last_seq = !list_empty(jlist->head)
382                 ? list_last_entry(jlist->head, struct journal_replay,
383                                   list)->j.last_seq
384                 : 0;
385
386         /* Is this entry older than the range we need? */
387         if (le64_to_cpu(j->seq) < le64_to_cpu(last_seq)) {
388                 ret = JOURNAL_ENTRY_ADD_OUT_OF_RANGE;
389                 goto out;
390         }
391
392         /* Drop entries we don't need anymore */
393         list_for_each_entry_safe(i, pos, jlist->head, list) {
394                 if (le64_to_cpu(i->j.seq) >= le64_to_cpu(j->last_seq))
395                         break;
396                 list_del(&i->list);
397                 kvpfree(i, offsetof(struct journal_replay, j) +
398                         vstruct_bytes(&i->j));
399         }
400
401         list_for_each_entry_reverse(i, jlist->head, list) {
402                 /* Duplicate? */
403                 if (le64_to_cpu(j->seq) == le64_to_cpu(i->j.seq)) {
404                         fsck_err_on(bytes != vstruct_bytes(&i->j) ||
405                                     memcmp(j, &i->j, bytes), c,
406                                     "found duplicate but non identical journal entries (seq %llu)",
407                                     le64_to_cpu(j->seq));
408                         goto found;
409                 }
410
411                 if (le64_to_cpu(j->seq) > le64_to_cpu(i->j.seq)) {
412                         where = &i->list;
413                         goto add;
414                 }
415         }
416
417         where = jlist->head;
418 add:
419         i = kvpmalloc(offsetof(struct journal_replay, j) + bytes, GFP_KERNEL);
420         if (!i) {
421                 ret = -ENOMEM;
422                 goto out;
423         }
424
425         list_add(&i->list, where);
426         i->devs.nr = 0;
427         memcpy(&i->j, j, bytes);
428 found:
429         if (!bch2_dev_list_has_dev(i->devs, ca->dev_idx))
430                 bch2_dev_list_add_dev(&i->devs, ca->dev_idx);
431         else
432                 fsck_err_on(1, c, "duplicate journal entries on same device");
433         ret = JOURNAL_ENTRY_ADD_OK;
434 out:
435 fsck_err:
436         return ret;
437 }
438
439 static struct nonce journal_nonce(const struct jset *jset)
440 {
441         return (struct nonce) {{
442                 [0] = 0,
443                 [1] = ((__le32 *) &jset->seq)[0],
444                 [2] = ((__le32 *) &jset->seq)[1],
445                 [3] = BCH_NONCE_JOURNAL,
446         }};
447 }
448
449 /* this fills in a range with empty jset_entries: */
450 static void journal_entry_null_range(void *start, void *end)
451 {
452         struct jset_entry *entry;
453
454         for (entry = start; entry != end; entry = vstruct_next(entry))
455                 memset(entry, 0, sizeof(*entry));
456 }
457
458 static int journal_validate_key(struct bch_fs *c, struct jset *jset,
459                                 struct jset_entry *entry,
460                                 struct bkey_i *k, enum bkey_type key_type,
461                                 const char *type)
462 {
463         void *next = vstruct_next(entry);
464         const char *invalid;
465         char buf[160];
466         int ret = 0;
467
468         if (mustfix_fsck_err_on(!k->k.u64s, c,
469                         "invalid %s in journal: k->u64s 0", type)) {
470                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
471                 journal_entry_null_range(vstruct_next(entry), next);
472                 return 0;
473         }
474
475         if (mustfix_fsck_err_on((void *) bkey_next(k) >
476                                 (void *) vstruct_next(entry), c,
477                         "invalid %s in journal: extends past end of journal entry",
478                         type)) {
479                 entry->u64s = cpu_to_le16((u64 *) k - entry->_data);
480                 journal_entry_null_range(vstruct_next(entry), next);
481                 return 0;
482         }
483
484         if (mustfix_fsck_err_on(k->k.format != KEY_FORMAT_CURRENT, c,
485                         "invalid %s in journal: bad format %u",
486                         type, k->k.format)) {
487                 le16_add_cpu(&entry->u64s, -k->k.u64s);
488                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
489                 journal_entry_null_range(vstruct_next(entry), next);
490                 return 0;
491         }
492
493         if (JSET_BIG_ENDIAN(jset) != CPU_BIG_ENDIAN)
494                 bch2_bkey_swab(key_type, NULL, bkey_to_packed(k));
495
496         invalid = bch2_bkey_invalid(c, key_type, bkey_i_to_s_c(k));
497         if (invalid) {
498                 bch2_bkey_val_to_text(c, key_type, buf, sizeof(buf),
499                                      bkey_i_to_s_c(k));
500                 mustfix_fsck_err(c, "invalid %s in journal: %s\n%s",
501                                  type, invalid, buf);
502
503                 le16_add_cpu(&entry->u64s, -k->k.u64s);
504                 memmove(k, bkey_next(k), next - (void *) bkey_next(k));
505                 journal_entry_null_range(vstruct_next(entry), next);
506                 return 0;
507         }
508 fsck_err:
509         return ret;
510 }
511
512 #define JOURNAL_ENTRY_REREAD    5
513 #define JOURNAL_ENTRY_NONE      6
514 #define JOURNAL_ENTRY_BAD       7
515
516 #define journal_entry_err(c, msg, ...)                                  \
517 ({                                                                      \
518         if (write == READ) {                                            \
519                 mustfix_fsck_err(c, msg, ##__VA_ARGS__);                \
520         } else {                                                        \
521                 bch_err(c, "detected corrupt metadata before write:\n"  \
522                         msg, ##__VA_ARGS__);                            \
523                 ret = BCH_FSCK_ERRORS_NOT_FIXED;                        \
524                 goto fsck_err;                                          \
525         }                                                               \
526         true;                                                           \
527 })
528
529 #define journal_entry_err_on(cond, c, msg, ...)                         \
530         ((cond) ? journal_entry_err(c, msg, ##__VA_ARGS__) : false)
531
532 static int journal_entry_validate_entries(struct bch_fs *c, struct jset *jset,
533                                           int write)
534 {
535         struct jset_entry *entry;
536         int ret = 0;
537
538         vstruct_for_each(jset, entry) {
539                 void *next = vstruct_next(entry);
540                 struct bkey_i *k;
541
542                 if (journal_entry_err_on(vstruct_next(entry) >
543                                          vstruct_last(jset), c,
544                                 "journal entry extends past end of jset")) {
545                         jset->u64s = cpu_to_le32((u64 *) entry - jset->_data);
546                         break;
547                 }
548
549                 switch (entry->type) {
550                 case JOURNAL_ENTRY_BTREE_KEYS:
551                         vstruct_for_each(entry, k) {
552                                 ret = journal_validate_key(c, jset, entry, k,
553                                                 bkey_type(entry->level,
554                                                           entry->btree_id),
555                                                 "key");
556                                 if (ret)
557                                         goto fsck_err;
558                         }
559                         break;
560
561                 case JOURNAL_ENTRY_BTREE_ROOT:
562                         k = entry->start;
563
564                         if (journal_entry_err_on(!entry->u64s ||
565                                         le16_to_cpu(entry->u64s) != k->k.u64s, c,
566                                         "invalid btree root journal entry: wrong number of keys")) {
567                                 /*
568                                  * we don't want to null out this jset_entry,
569                                  * just the contents, so that later we can tell
570                                  * we were _supposed_ to have a btree root
571                                  */
572                                 entry->u64s = 0;
573                                 journal_entry_null_range(vstruct_next(entry), next);
574                                 continue;
575                         }
576
577                         ret = journal_validate_key(c, jset, entry, k,
578                                                    BKEY_TYPE_BTREE, "btree root");
579                         if (ret)
580                                 goto fsck_err;
581                         break;
582
583                 case JOURNAL_ENTRY_PRIO_PTRS:
584                         break;
585
586                 case JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED:
587                         if (journal_entry_err_on(le16_to_cpu(entry->u64s) != 1, c,
588                                 "invalid journal seq blacklist entry: bad size")) {
589                                 journal_entry_null_range(entry,
590                                                 vstruct_next(entry));
591                         }
592
593                         break;
594                 default:
595                         journal_entry_err(c, "invalid journal entry type %u",
596                                           entry->type);
597                         journal_entry_null_range(entry, vstruct_next(entry));
598                         break;
599                 }
600         }
601
602 fsck_err:
603         return ret;
604 }
605
606 static int journal_entry_validate(struct bch_fs *c,
607                                   struct jset *jset, u64 sector,
608                                   unsigned bucket_sectors_left,
609                                   unsigned sectors_read,
610                                   int write)
611 {
612         size_t bytes = vstruct_bytes(jset);
613         struct bch_csum csum;
614         int ret = 0;
615
616         if (le64_to_cpu(jset->magic) != jset_magic(c))
617                 return JOURNAL_ENTRY_NONE;
618
619         if (le32_to_cpu(jset->version) != BCACHE_JSET_VERSION) {
620                 bch_err(c, "unknown journal entry version %u",
621                         le32_to_cpu(jset->version));
622                 return BCH_FSCK_UNKNOWN_VERSION;
623         }
624
625         if (journal_entry_err_on(bytes > bucket_sectors_left << 9, c,
626                         "journal entry too big (%zu bytes), sector %lluu",
627                         bytes, sector)) {
628                 /* XXX: note we might have missing journal entries */
629                 return JOURNAL_ENTRY_BAD;
630         }
631
632         if (bytes > sectors_read << 9)
633                 return JOURNAL_ENTRY_REREAD;
634
635         if (fsck_err_on(!bch2_checksum_type_valid(c, JSET_CSUM_TYPE(jset)), c,
636                         "journal entry with unknown csum type %llu sector %lluu",
637                         JSET_CSUM_TYPE(jset), sector))
638                 return JOURNAL_ENTRY_BAD;
639
640         csum = csum_vstruct(c, JSET_CSUM_TYPE(jset), journal_nonce(jset), jset);
641         if (journal_entry_err_on(bch2_crc_cmp(csum, jset->csum), c,
642                         "journal checksum bad, sector %llu", sector)) {
643                 /* XXX: retry IO, when we start retrying checksum errors */
644                 /* XXX: note we might have missing journal entries */
645                 return JOURNAL_ENTRY_BAD;
646         }
647
648         bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
649                     jset->encrypted_start,
650                     vstruct_end(jset) - (void *) jset->encrypted_start);
651
652         if (journal_entry_err_on(le64_to_cpu(jset->last_seq) > le64_to_cpu(jset->seq), c,
653                         "invalid journal entry: last_seq > seq"))
654                 jset->last_seq = jset->seq;
655
656         return 0;
657 fsck_err:
658         return ret;
659 }
660
661 struct journal_read_buf {
662         void            *data;
663         size_t          size;
664 };
665
666 static int journal_read_buf_realloc(struct journal_read_buf *b,
667                                     size_t new_size)
668 {
669         void *n;
670
671         /* the bios are sized for this many pages, max: */
672         if (new_size > JOURNAL_ENTRY_SIZE_MAX)
673                 return -ENOMEM;
674
675         new_size = roundup_pow_of_two(new_size);
676         n = kvpmalloc(new_size, GFP_KERNEL);
677         if (!n)
678                 return -ENOMEM;
679
680         kvpfree(b->data, b->size);
681         b->data = n;
682         b->size = new_size;
683         return 0;
684 }
685
686 static int journal_read_bucket(struct bch_dev *ca,
687                                struct journal_read_buf *buf,
688                                struct journal_list *jlist,
689                                unsigned bucket, u64 *seq, bool *entries_found)
690 {
691         struct bch_fs *c = ca->fs;
692         struct journal_device *ja = &ca->journal;
693         struct bio *bio = ja->bio;
694         struct jset *j = NULL;
695         unsigned sectors, sectors_read = 0;
696         u64 offset = bucket_to_sector(ca, ja->buckets[bucket]),
697             end = offset + ca->mi.bucket_size;
698         bool saw_bad = false;
699         int ret = 0;
700
701         pr_debug("reading %u", bucket);
702
703         while (offset < end) {
704                 if (!sectors_read) {
705 reread:                 sectors_read = min_t(unsigned,
706                                 end - offset, buf->size >> 9);
707
708                         bio_reset(bio);
709                         bio_set_dev(bio, ca->disk_sb.bdev);
710                         bio->bi_iter.bi_sector  = offset;
711                         bio->bi_iter.bi_size    = sectors_read << 9;
712                         bio_set_op_attrs(bio, REQ_OP_READ, 0);
713                         bch2_bio_map(bio, buf->data);
714
715                         ret = submit_bio_wait(bio);
716
717                         if (bch2_dev_io_err_on(ret, ca,
718                                                "journal read from sector %llu",
719                                                offset) ||
720                             bch2_meta_read_fault("journal"))
721                                 return -EIO;
722
723                         j = buf->data;
724                 }
725
726                 ret = journal_entry_validate(c, j, offset,
727                                         end - offset, sectors_read,
728                                         READ);
729                 switch (ret) {
730                 case BCH_FSCK_OK:
731                         break;
732                 case JOURNAL_ENTRY_REREAD:
733                         if (vstruct_bytes(j) > buf->size) {
734                                 ret = journal_read_buf_realloc(buf,
735                                                         vstruct_bytes(j));
736                                 if (ret)
737                                         return ret;
738                         }
739                         goto reread;
740                 case JOURNAL_ENTRY_NONE:
741                         if (!saw_bad)
742                                 return 0;
743                         sectors = c->opts.block_size;
744                         goto next_block;
745                 case JOURNAL_ENTRY_BAD:
746                         saw_bad = true;
747                         sectors = c->opts.block_size;
748                         goto next_block;
749                 default:
750                         return ret;
751                 }
752
753                 /*
754                  * This happens sometimes if we don't have discards on -
755                  * when we've partially overwritten a bucket with new
756                  * journal entries. We don't need the rest of the
757                  * bucket:
758                  */
759                 if (le64_to_cpu(j->seq) < ja->bucket_seq[bucket])
760                         return 0;
761
762                 ja->bucket_seq[bucket] = le64_to_cpu(j->seq);
763
764                 mutex_lock(&jlist->lock);
765                 ret = journal_entry_add(c, ca, jlist, j);
766                 mutex_unlock(&jlist->lock);
767
768                 switch (ret) {
769                 case JOURNAL_ENTRY_ADD_OK:
770                         *entries_found = true;
771                         break;
772                 case JOURNAL_ENTRY_ADD_OUT_OF_RANGE:
773                         break;
774                 default:
775                         return ret;
776                 }
777
778                 if (le64_to_cpu(j->seq) > *seq)
779                         *seq = le64_to_cpu(j->seq);
780
781                 sectors = vstruct_sectors(j, c->block_bits);
782 next_block:
783                 pr_debug("next");
784                 offset          += sectors;
785                 sectors_read    -= sectors;
786                 j = ((void *) j) + (sectors << 9);
787         }
788
789         return 0;
790 }
791
792 static void bch2_journal_read_device(struct closure *cl)
793 {
794 #define read_bucket(b)                                                  \
795         ({                                                              \
796                 bool entries_found = false;                             \
797                 ret = journal_read_bucket(ca, &buf, jlist, b, &seq,     \
798                                           &entries_found);              \
799                 if (ret)                                                \
800                         goto err;                                       \
801                 __set_bit(b, bitmap);                                   \
802                 entries_found;                                          \
803          })
804
805         struct journal_device *ja =
806                 container_of(cl, struct journal_device, read);
807         struct bch_dev *ca = container_of(ja, struct bch_dev, journal);
808         struct journal_list *jlist =
809                 container_of(cl->parent, struct journal_list, cl);
810         struct request_queue *q = bdev_get_queue(ca->disk_sb.bdev);
811         struct journal_read_buf buf = { NULL, 0 };
812
813         DECLARE_BITMAP(bitmap, ja->nr);
814         unsigned i, l, r;
815         u64 seq = 0;
816         int ret;
817
818         if (!ja->nr)
819                 goto out;
820
821         bitmap_zero(bitmap, ja->nr);
822         ret = journal_read_buf_realloc(&buf, PAGE_SIZE);
823         if (ret)
824                 goto err;
825
826         pr_debug("%u journal buckets", ja->nr);
827
828         /*
829          * If the device supports discard but not secure discard, we can't do
830          * the fancy fibonacci hash/binary search because the live journal
831          * entries might not form a contiguous range:
832          */
833         for (i = 0; i < ja->nr; i++)
834                 read_bucket(i);
835         goto search_done;
836
837         if (!blk_queue_nonrot(q))
838                 goto linear_scan;
839
840         /*
841          * Read journal buckets ordered by golden ratio hash to quickly
842          * find a sequence of buckets with valid journal entries
843          */
844         for (i = 0; i < ja->nr; i++) {
845                 l = (i * 2654435769U) % ja->nr;
846
847                 if (test_bit(l, bitmap))
848                         break;
849
850                 if (read_bucket(l))
851                         goto bsearch;
852         }
853
854         /*
855          * If that fails, check all the buckets we haven't checked
856          * already
857          */
858         pr_debug("falling back to linear search");
859 linear_scan:
860         for (l = find_first_zero_bit(bitmap, ja->nr);
861              l < ja->nr;
862              l = find_next_zero_bit(bitmap, ja->nr, l + 1))
863                 if (read_bucket(l))
864                         goto bsearch;
865
866         /* no journal entries on this device? */
867         if (l == ja->nr)
868                 goto out;
869 bsearch:
870         /* Binary search */
871         r = find_next_bit(bitmap, ja->nr, l + 1);
872         pr_debug("starting binary search, l %u r %u", l, r);
873
874         while (l + 1 < r) {
875                 unsigned m = (l + r) >> 1;
876                 u64 cur_seq = seq;
877
878                 read_bucket(m);
879
880                 if (cur_seq != seq)
881                         l = m;
882                 else
883                         r = m;
884         }
885
886 search_done:
887         /*
888          * Find the journal bucket with the highest sequence number:
889          *
890          * If there's duplicate journal entries in multiple buckets (which
891          * definitely isn't supposed to happen, but...) - make sure to start
892          * cur_idx at the last of those buckets, so we don't deadlock trying to
893          * allocate
894          */
895         seq = 0;
896
897         for (i = 0; i < ja->nr; i++)
898                 if (ja->bucket_seq[i] >= seq &&
899                     ja->bucket_seq[i] != ja->bucket_seq[(i + 1) % ja->nr]) {
900                         /*
901                          * When journal_next_bucket() goes to allocate for
902                          * the first time, it'll use the bucket after
903                          * ja->cur_idx
904                          */
905                         ja->cur_idx = i;
906                         seq = ja->bucket_seq[i];
907                 }
908
909         /*
910          * Set last_idx to indicate the entire journal is full and needs to be
911          * reclaimed - journal reclaim will immediately reclaim whatever isn't
912          * pinned when it first runs:
913          */
914         ja->last_idx = (ja->cur_idx + 1) % ja->nr;
915
916         /*
917          * Read buckets in reverse order until we stop finding more journal
918          * entries:
919          */
920         for (i = (ja->cur_idx + ja->nr - 1) % ja->nr;
921              i != ja->cur_idx;
922              i = (i + ja->nr - 1) % ja->nr)
923                 if (!test_bit(i, bitmap) &&
924                     !read_bucket(i))
925                         break;
926 out:
927         kvpfree(buf.data, buf.size);
928         percpu_ref_put(&ca->io_ref);
929         closure_return(cl);
930 err:
931         mutex_lock(&jlist->lock);
932         jlist->ret = ret;
933         mutex_unlock(&jlist->lock);
934         goto out;
935 #undef read_bucket
936 }
937
938 void bch2_journal_entries_free(struct list_head *list)
939 {
940
941         while (!list_empty(list)) {
942                 struct journal_replay *i =
943                         list_first_entry(list, struct journal_replay, list);
944                 list_del(&i->list);
945                 kvpfree(i, offsetof(struct journal_replay, j) +
946                         vstruct_bytes(&i->j));
947         }
948 }
949
950 static int journal_seq_blacklist_read(struct journal *j,
951                                       struct journal_replay *i,
952                                       struct journal_entry_pin_list *p)
953 {
954         struct bch_fs *c = container_of(j, struct bch_fs, journal);
955         struct jset_entry *entry;
956         struct journal_seq_blacklist *bl;
957         u64 seq;
958
959         for_each_jset_entry_type(entry, &i->j,
960                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED) {
961                 struct jset_entry_blacklist *bl_entry =
962                         container_of(entry, struct jset_entry_blacklist, entry);
963                 seq = le64_to_cpu(bl_entry->seq);
964
965                 bch_verbose(c, "blacklisting existing journal seq %llu", seq);
966
967                 bl = bch2_journal_seq_blacklisted_new(j, seq);
968                 if (!bl)
969                         return -ENOMEM;
970
971                 journal_pin_add_entry(j, p, &bl->pin,
972                                   journal_seq_blacklist_flush);
973                 bl->written = true;
974         }
975
976         return 0;
977 }
978
979 static inline bool journal_has_keys(struct list_head *list)
980 {
981         struct journal_replay *i;
982         struct jset_entry *entry;
983         struct bkey_i *k, *_n;
984
985         list_for_each_entry(i, list, list)
986                 for_each_jset_key(k, _n, entry, &i->j)
987                         return true;
988
989         return false;
990 }
991
992 int bch2_journal_read(struct bch_fs *c, struct list_head *list)
993 {
994         struct journal *j = &c->journal;
995         struct journal_list jlist;
996         struct journal_replay *i;
997         struct journal_entry_pin_list *p;
998         struct bch_dev *ca;
999         u64 cur_seq, end_seq, seq;
1000         unsigned iter, keys = 0, entries = 0;
1001         size_t nr;
1002         bool degraded = false;
1003         int ret = 0;
1004
1005         closure_init_stack(&jlist.cl);
1006         mutex_init(&jlist.lock);
1007         jlist.head = list;
1008         jlist.ret = 0;
1009
1010         for_each_member_device(ca, c, iter) {
1011                 if (!(bch2_dev_has_data(c, ca) & (1 << BCH_DATA_JOURNAL)))
1012                         continue;
1013
1014                 if ((ca->mi.state == BCH_MEMBER_STATE_RW ||
1015                      ca->mi.state == BCH_MEMBER_STATE_RO) &&
1016                     percpu_ref_tryget(&ca->io_ref))
1017                         closure_call(&ca->journal.read,
1018                                      bch2_journal_read_device,
1019                                      system_unbound_wq,
1020                                      &jlist.cl);
1021                 else
1022                         degraded = true;
1023         }
1024
1025         closure_sync(&jlist.cl);
1026
1027         if (jlist.ret)
1028                 return jlist.ret;
1029
1030         if (list_empty(list)){
1031                 bch_err(c, "no journal entries found");
1032                 return BCH_FSCK_REPAIR_IMPOSSIBLE;
1033         }
1034
1035         fsck_err_on(c->sb.clean && journal_has_keys(list), c,
1036                     "filesystem marked clean but journal has keys to replay");
1037
1038         list_for_each_entry(i, list, list) {
1039                 ret = journal_entry_validate_entries(c, &i->j, READ);
1040                 if (ret)
1041                         goto fsck_err;
1042
1043                 /*
1044                  * If we're mounting in degraded mode - if we didn't read all
1045                  * the devices - this is wrong:
1046                  */
1047
1048                 if (!degraded &&
1049                     (test_bit(BCH_FS_REBUILD_REPLICAS, &c->flags) ||
1050                      fsck_err_on(!bch2_replicas_marked(c, BCH_DATA_JOURNAL,
1051                                                        i->devs), c,
1052                                  "superblock not marked as containing replicas (type %u)",
1053                                  BCH_DATA_JOURNAL))) {
1054                         ret = bch2_mark_replicas(c, BCH_DATA_JOURNAL, i->devs);
1055                         if (ret)
1056                                 return ret;
1057                 }
1058         }
1059
1060         i = list_last_entry(list, struct journal_replay, list);
1061
1062         nr = le64_to_cpu(i->j.seq) - le64_to_cpu(i->j.last_seq) + 1;
1063
1064         if (nr > j->pin.size) {
1065                 free_fifo(&j->pin);
1066                 init_fifo(&j->pin, roundup_pow_of_two(nr), GFP_KERNEL);
1067                 if (!j->pin.data) {
1068                         bch_err(c, "error reallocating journal fifo (%zu open entries)", nr);
1069                         return -ENOMEM;
1070                 }
1071         }
1072
1073         atomic64_set(&j->seq, le64_to_cpu(i->j.seq));
1074         j->last_seq_ondisk = le64_to_cpu(i->j.last_seq);
1075
1076         j->pin.front    = le64_to_cpu(i->j.last_seq);
1077         j->pin.back     = le64_to_cpu(i->j.seq) + 1;
1078
1079         fifo_for_each_entry_ptr(p, &j->pin, seq) {
1080                 INIT_LIST_HEAD(&p->list);
1081                 INIT_LIST_HEAD(&p->flushed);
1082                 atomic_set(&p->count, 0);
1083                 p->devs.nr = 0;
1084         }
1085
1086         mutex_lock(&j->blacklist_lock);
1087
1088         list_for_each_entry(i, list, list) {
1089                 p = journal_seq_pin(j, le64_to_cpu(i->j.seq));
1090
1091                 atomic_set(&p->count, 1);
1092                 p->devs = i->devs;
1093
1094                 if (journal_seq_blacklist_read(j, i, p)) {
1095                         mutex_unlock(&j->blacklist_lock);
1096                         return -ENOMEM;
1097                 }
1098         }
1099
1100         mutex_unlock(&j->blacklist_lock);
1101
1102         cur_seq = journal_last_seq(j);
1103         end_seq = le64_to_cpu(list_last_entry(list,
1104                                 struct journal_replay, list)->j.seq);
1105
1106         list_for_each_entry(i, list, list) {
1107                 struct jset_entry *entry;
1108                 struct bkey_i *k, *_n;
1109                 bool blacklisted;
1110
1111                 mutex_lock(&j->blacklist_lock);
1112                 while (cur_seq < le64_to_cpu(i->j.seq) &&
1113                        journal_seq_blacklist_find(j, cur_seq))
1114                         cur_seq++;
1115
1116                 blacklisted = journal_seq_blacklist_find(j,
1117                                                          le64_to_cpu(i->j.seq));
1118                 mutex_unlock(&j->blacklist_lock);
1119
1120                 fsck_err_on(blacklisted, c,
1121                             "found blacklisted journal entry %llu",
1122                             le64_to_cpu(i->j.seq));
1123
1124                 fsck_err_on(le64_to_cpu(i->j.seq) != cur_seq, c,
1125                         "journal entries %llu-%llu missing! (replaying %llu-%llu)",
1126                         cur_seq, le64_to_cpu(i->j.seq) - 1,
1127                         journal_last_seq(j), end_seq);
1128
1129                 cur_seq = le64_to_cpu(i->j.seq) + 1;
1130
1131                 for_each_jset_key(k, _n, entry, &i->j)
1132                         keys++;
1133                 entries++;
1134         }
1135
1136         bch_info(c, "journal read done, %i keys in %i entries, seq %llu",
1137                  keys, entries, journal_cur_seq(j));
1138 fsck_err:
1139         return ret;
1140 }
1141
1142 int bch2_journal_mark(struct bch_fs *c, struct list_head *list)
1143 {
1144         struct bkey_i *k, *n;
1145         struct jset_entry *j;
1146         struct journal_replay *r;
1147         int ret;
1148
1149         list_for_each_entry(r, list, list)
1150                 for_each_jset_key(k, n, j, &r->j) {
1151                         enum bkey_type type = bkey_type(j->level, j->btree_id);
1152                         struct bkey_s_c k_s_c = bkey_i_to_s_c(k);
1153
1154                         if (btree_type_has_ptrs(type)) {
1155                                 ret = bch2_btree_mark_key_initial(c, type, k_s_c);
1156                                 if (ret)
1157                                         return ret;
1158                         }
1159                 }
1160
1161         return 0;
1162 }
1163
1164 static bool journal_entry_is_open(struct journal *j)
1165 {
1166         return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
1167 }
1168
1169 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
1170 {
1171         struct journal_buf *w = journal_prev_buf(j);
1172
1173         atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
1174
1175         if (!need_write_just_set &&
1176             test_bit(JOURNAL_NEED_WRITE, &j->flags))
1177                 __bch2_time_stats_update(j->delay_time,
1178                                         j->need_write_time);
1179 #if 0
1180         closure_call(&j->io, journal_write, NULL, NULL);
1181 #else
1182         /* Shut sparse up: */
1183         closure_init(&j->io, NULL);
1184         set_closure_fn(&j->io, journal_write, NULL);
1185         journal_write(&j->io);
1186 #endif
1187 }
1188
1189 static void journal_pin_new_entry(struct journal *j, int count)
1190 {
1191         struct journal_entry_pin_list *p;
1192
1193         /*
1194          * The fifo_push() needs to happen at the same time as j->seq is
1195          * incremented for journal_last_seq() to be calculated correctly
1196          */
1197         atomic64_inc(&j->seq);
1198         p = fifo_push_ref(&j->pin);
1199
1200         INIT_LIST_HEAD(&p->list);
1201         INIT_LIST_HEAD(&p->flushed);
1202         atomic_set(&p->count, count);
1203         p->devs.nr = 0;
1204 }
1205
1206 static void bch2_journal_buf_init(struct journal *j)
1207 {
1208         struct journal_buf *buf = journal_cur_buf(j);
1209
1210         memset(buf->has_inode, 0, sizeof(buf->has_inode));
1211
1212         memset(buf->data, 0, sizeof(*buf->data));
1213         buf->data->seq  = cpu_to_le64(journal_cur_seq(j));
1214         buf->data->u64s = 0;
1215 }
1216
1217 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
1218 {
1219         return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1220 }
1221
1222 static enum {
1223         JOURNAL_ENTRY_ERROR,
1224         JOURNAL_ENTRY_INUSE,
1225         JOURNAL_ENTRY_CLOSED,
1226         JOURNAL_UNLOCKED,
1227 } journal_buf_switch(struct journal *j, bool need_write_just_set)
1228 {
1229         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1230         struct journal_buf *buf;
1231         union journal_res_state old, new;
1232         u64 v = atomic64_read(&j->reservations.counter);
1233
1234         lockdep_assert_held(&j->lock);
1235
1236         do {
1237                 old.v = new.v = v;
1238                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
1239                         return JOURNAL_ENTRY_CLOSED;
1240
1241                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1242                         return JOURNAL_ENTRY_ERROR;
1243
1244                 if (new.prev_buf_unwritten)
1245                         return JOURNAL_ENTRY_INUSE;
1246
1247                 /*
1248                  * avoid race between setting buf->data->u64s and
1249                  * journal_res_put starting write:
1250                  */
1251                 journal_state_inc(&new);
1252
1253                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
1254                 new.idx++;
1255                 new.prev_buf_unwritten = 1;
1256
1257                 BUG_ON(journal_state_count(new, new.idx));
1258         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1259                                        old.v, new.v)) != old.v);
1260
1261         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
1262
1263         buf = &j->buf[old.idx];
1264         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
1265
1266         j->prev_buf_sectors =
1267                 vstruct_blocks_plus(buf->data, c->block_bits,
1268                                     journal_entry_u64s_reserve(buf)) *
1269                 c->opts.block_size;
1270         BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
1271
1272         journal_reclaim_fast(j);
1273         /* XXX: why set this here, and not in journal_write()? */
1274         buf->data->last_seq     = cpu_to_le64(journal_last_seq(j));
1275
1276         journal_pin_new_entry(j, 1);
1277
1278         bch2_journal_buf_init(j);
1279
1280         cancel_delayed_work(&j->write_work);
1281         spin_unlock(&j->lock);
1282
1283         if (c->bucket_journal_seq > 1 << 14) {
1284                 c->bucket_journal_seq = 0;
1285                 bch2_bucket_seq_cleanup(c);
1286         }
1287
1288         /* ugh - might be called from __journal_res_get() under wait_event() */
1289         __set_current_state(TASK_RUNNING);
1290         bch2_journal_buf_put(j, old.idx, need_write_just_set);
1291
1292         return JOURNAL_UNLOCKED;
1293 }
1294
1295 void bch2_journal_halt(struct journal *j)
1296 {
1297         union journal_res_state old, new;
1298         u64 v = atomic64_read(&j->reservations.counter);
1299
1300         do {
1301                 old.v = new.v = v;
1302                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1303                         return;
1304
1305                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
1306         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1307                                        old.v, new.v)) != old.v);
1308
1309         journal_wake(j);
1310         closure_wake_up(&journal_cur_buf(j)->wait);
1311         closure_wake_up(&journal_prev_buf(j)->wait);
1312 }
1313
1314 static unsigned journal_dev_buckets_available(struct journal *j,
1315                                               struct bch_dev *ca)
1316 {
1317         struct journal_device *ja = &ca->journal;
1318         unsigned next = (ja->cur_idx + 1) % ja->nr;
1319         unsigned available = (ja->last_idx + ja->nr - next) % ja->nr;
1320
1321         /*
1322          * Hack to avoid a deadlock during journal replay:
1323          * journal replay might require setting a new btree
1324          * root, which requires writing another journal entry -
1325          * thus, if the journal is full (and this happens when
1326          * replaying the first journal bucket's entries) we're
1327          * screwed.
1328          *
1329          * So don't let the journal fill up unless we're in
1330          * replay:
1331          */
1332         if (test_bit(JOURNAL_REPLAY_DONE, &j->flags))
1333                 available = max((int) available - 2, 0);
1334
1335         /*
1336          * Don't use the last bucket unless writing the new last_seq
1337          * will make another bucket available:
1338          */
1339         if (ja->bucket_seq[ja->last_idx] >= journal_last_seq(j))
1340                 available = max((int) available - 1, 0);
1341
1342         return available;
1343 }
1344
1345 /* returns number of sectors available for next journal entry: */
1346 static int journal_entry_sectors(struct journal *j)
1347 {
1348         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1349         struct bch_dev *ca;
1350         struct bkey_s_extent e = bkey_i_to_s_extent(&j->key);
1351         unsigned sectors_available = UINT_MAX;
1352         unsigned i, nr_online = 0, nr_devs = 0;
1353
1354         lockdep_assert_held(&j->lock);
1355
1356         rcu_read_lock();
1357         for_each_member_device_rcu(ca, c, i,
1358                                    &c->rw_devs[BCH_DATA_JOURNAL]) {
1359                 struct journal_device *ja = &ca->journal;
1360                 unsigned buckets_required = 0;
1361
1362                 if (!ja->nr)
1363                         continue;
1364
1365                 sectors_available = min_t(unsigned, sectors_available,
1366                                           ca->mi.bucket_size);
1367
1368                 /*
1369                  * Note that we don't allocate the space for a journal entry
1370                  * until we write it out - thus, if we haven't started the write
1371                  * for the previous entry we have to make sure we have space for
1372                  * it too:
1373                  */
1374                 if (bch2_extent_has_device(e.c, ca->dev_idx)) {
1375                         if (j->prev_buf_sectors > ja->sectors_free)
1376                                 buckets_required++;
1377
1378                         if (j->prev_buf_sectors + sectors_available >
1379                             ja->sectors_free)
1380                                 buckets_required++;
1381                 } else {
1382                         if (j->prev_buf_sectors + sectors_available >
1383                             ca->mi.bucket_size)
1384                                 buckets_required++;
1385
1386                         buckets_required++;
1387                 }
1388
1389                 if (journal_dev_buckets_available(j, ca) >= buckets_required)
1390                         nr_devs++;
1391                 nr_online++;
1392         }
1393         rcu_read_unlock();
1394
1395         if (nr_online < c->opts.metadata_replicas_required)
1396                 return -EROFS;
1397
1398         if (nr_devs < min_t(unsigned, nr_online, c->opts.metadata_replicas))
1399                 return 0;
1400
1401         return sectors_available;
1402 }
1403
1404 /*
1405  * should _only_ called from journal_res_get() - when we actually want a
1406  * journal reservation - journal entry is open means journal is dirty:
1407  *
1408  * returns:
1409  * 1:           success
1410  * 0:           journal currently full (must wait)
1411  * -EROFS:      insufficient rw devices
1412  * -EIO:        journal error
1413  */
1414 static int journal_entry_open(struct journal *j)
1415 {
1416         struct journal_buf *buf = journal_cur_buf(j);
1417         union journal_res_state old, new;
1418         ssize_t u64s;
1419         int sectors;
1420         u64 v;
1421
1422         lockdep_assert_held(&j->lock);
1423         BUG_ON(journal_entry_is_open(j));
1424
1425         if (!fifo_free(&j->pin))
1426                 return 0;
1427
1428         sectors = journal_entry_sectors(j);
1429         if (sectors <= 0)
1430                 return sectors;
1431
1432         buf->disk_sectors       = sectors;
1433
1434         sectors = min_t(unsigned, sectors, buf->size >> 9);
1435         j->cur_buf_sectors      = sectors;
1436
1437         u64s = (sectors << 9) / sizeof(u64);
1438
1439         /* Subtract the journal header */
1440         u64s -= sizeof(struct jset) / sizeof(u64);
1441         /*
1442          * Btree roots, prio pointers don't get added until right before we do
1443          * the write:
1444          */
1445         u64s -= journal_entry_u64s_reserve(buf);
1446         u64s  = max_t(ssize_t, 0L, u64s);
1447
1448         BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
1449
1450         if (u64s <= le32_to_cpu(buf->data->u64s))
1451                 return 0;
1452
1453         /*
1454          * Must be set before marking the journal entry as open:
1455          */
1456         j->cur_entry_u64s = u64s;
1457
1458         v = atomic64_read(&j->reservations.counter);
1459         do {
1460                 old.v = new.v = v;
1461
1462                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
1463                         return -EIO;
1464
1465                 /* Handle any already added entries */
1466                 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
1467         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
1468                                        old.v, new.v)) != old.v);
1469
1470         if (j->res_get_blocked_start)
1471                 __bch2_time_stats_update(j->blocked_time,
1472                                         j->res_get_blocked_start);
1473         j->res_get_blocked_start = 0;
1474
1475         mod_delayed_work(system_freezable_wq,
1476                          &j->write_work,
1477                          msecs_to_jiffies(j->write_delay_ms));
1478         journal_wake(j);
1479         return 1;
1480 }
1481
1482 void bch2_journal_start(struct bch_fs *c)
1483 {
1484         struct journal *j = &c->journal;
1485         struct journal_seq_blacklist *bl;
1486         u64 new_seq = 0;
1487
1488         list_for_each_entry(bl, &j->seq_blacklist, list)
1489                 new_seq = max(new_seq, bl->seq);
1490
1491         spin_lock(&j->lock);
1492
1493         set_bit(JOURNAL_STARTED, &j->flags);
1494
1495         while (journal_cur_seq(j) < new_seq)
1496                 journal_pin_new_entry(j, 0);
1497
1498         /*
1499          * journal_buf_switch() only inits the next journal entry when it
1500          * closes an open journal entry - the very first journal entry gets
1501          * initialized here:
1502          */
1503         journal_pin_new_entry(j, 1);
1504         bch2_journal_buf_init(j);
1505
1506         spin_unlock(&j->lock);
1507
1508         /*
1509          * Adding entries to the next journal entry before allocating space on
1510          * disk for the next journal entry - this is ok, because these entries
1511          * only have to go down with the next journal entry we write:
1512          */
1513         list_for_each_entry(bl, &j->seq_blacklist, list)
1514                 if (!bl->written) {
1515                         bch2_journal_add_entry_noreservation(journal_cur_buf(j),
1516                                         JOURNAL_ENTRY_JOURNAL_SEQ_BLACKLISTED,
1517                                         0, 0, &bl->seq, 1);
1518
1519                         journal_pin_add_entry(j,
1520                                               &fifo_peek_back(&j->pin),
1521                                               &bl->pin,
1522                                               journal_seq_blacklist_flush);
1523                         bl->written = true;
1524                 }
1525
1526         queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
1527 }
1528
1529 int bch2_journal_replay(struct bch_fs *c, struct list_head *list)
1530 {
1531         struct journal *j = &c->journal;
1532         struct bkey_i *k, *_n;
1533         struct jset_entry *entry;
1534         struct journal_replay *i, *n;
1535         int ret = 0;
1536
1537         list_for_each_entry_safe(i, n, list, list) {
1538                 j->replay_pin_list =
1539                         journal_seq_pin(j, le64_to_cpu(i->j.seq));
1540
1541                 for_each_jset_key(k, _n, entry, &i->j) {
1542
1543                         if (entry->btree_id == BTREE_ID_ALLOC) {
1544                                 /*
1545                                  * allocation code handles replay for
1546                                  * BTREE_ID_ALLOC keys:
1547                                  */
1548                                 ret = bch2_alloc_replay_key(c, k->k.p);
1549                         } else {
1550                                 /*
1551                                  * We might cause compressed extents to be
1552                                  * split, so we need to pass in a
1553                                  * disk_reservation:
1554                                  */
1555                                 struct disk_reservation disk_res =
1556                                         bch2_disk_reservation_init(c, 0);
1557
1558                                 ret = bch2_btree_insert(c, entry->btree_id, k,
1559                                                         &disk_res, NULL, NULL,
1560                                                         BTREE_INSERT_NOFAIL|
1561                                                         BTREE_INSERT_JOURNAL_REPLAY);
1562                         }
1563
1564                         if (ret) {
1565                                 bch_err(c, "journal replay: error %d while replaying key",
1566                                         ret);
1567                                 goto err;
1568                         }
1569
1570                         cond_resched();
1571                 }
1572
1573                 if (atomic_dec_and_test(&j->replay_pin_list->count))
1574                         journal_wake(j);
1575         }
1576
1577         j->replay_pin_list = NULL;
1578
1579         bch2_journal_set_replay_done(j);
1580         ret = bch2_journal_flush_all_pins(j);
1581 err:
1582         bch2_journal_entries_free(list);
1583         return ret;
1584 }
1585
1586 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
1587                                          bool new_fs, struct closure *cl)
1588 {
1589         struct bch_fs *c = ca->fs;
1590         struct journal_device *ja = &ca->journal;
1591         struct bch_sb_field_journal *journal_buckets;
1592         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
1593         int ret = 0;
1594
1595         /* don't handle reducing nr of buckets yet: */
1596         if (nr <= ja->nr)
1597                 return 0;
1598
1599         ret = -ENOMEM;
1600         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1601         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
1602         if (!new_buckets || !new_bucket_seq)
1603                 goto err;
1604
1605         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
1606                                 nr + sizeof(*journal_buckets) / sizeof(u64));
1607         if (!journal_buckets)
1608                 goto err;
1609
1610         if (c)
1611                 spin_lock(&c->journal.lock);
1612
1613         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
1614         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
1615         swap(new_buckets,       ja->buckets);
1616         swap(new_bucket_seq,    ja->bucket_seq);
1617
1618         if (c)
1619                 spin_unlock(&c->journal.lock);
1620
1621         while (ja->nr < nr) {
1622                 struct open_bucket *ob = NULL;
1623                 long bucket;
1624
1625                 if (new_fs) {
1626                         bucket = bch2_bucket_alloc_new_fs(ca);
1627                         if (bucket < 0) {
1628                                 ret = -ENOSPC;
1629                                 goto err;
1630                         }
1631                 } else {
1632                         int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl);
1633                         if (ob_idx < 0) {
1634                                 ret = cl ? -EAGAIN : -ENOSPC;
1635                                 goto err;
1636                         }
1637
1638                         ob = c->open_buckets + ob_idx;
1639                         bucket = sector_to_bucket(ca, ob->ptr.offset);
1640                 }
1641
1642                 if (c)
1643                         spin_lock(&c->journal.lock);
1644
1645                 __array_insert_item(ja->buckets,                ja->nr, ja->last_idx);
1646                 __array_insert_item(ja->bucket_seq,             ja->nr, ja->last_idx);
1647                 __array_insert_item(journal_buckets->buckets,   ja->nr, ja->last_idx);
1648
1649                 ja->buckets[ja->last_idx] = bucket;
1650                 ja->bucket_seq[ja->last_idx] = 0;
1651                 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket);
1652
1653                 if (ja->last_idx < ja->nr) {
1654                         if (ja->cur_idx >= ja->last_idx)
1655                                 ja->cur_idx++;
1656                         ja->last_idx++;
1657                 }
1658                 ja->nr++;
1659
1660                 if (c)
1661                         spin_unlock(&c->journal.lock);
1662
1663                 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
1664                                 ca->mi.bucket_size,
1665                                 gc_phase(GC_PHASE_SB),
1666                                 new_fs
1667                                 ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE
1668                                 : 0);
1669
1670                 if (!new_fs)
1671                         bch2_open_bucket_put(c, ob);
1672         }
1673
1674         ret = 0;
1675 err:
1676         kfree(new_bucket_seq);
1677         kfree(new_buckets);
1678
1679         return ret;
1680 }
1681
1682 /*
1683  * Allocate more journal space at runtime - not currently making use if it, but
1684  * the code works:
1685  */
1686 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
1687                                 unsigned nr)
1688 {
1689         struct journal_device *ja = &ca->journal;
1690         struct closure cl;
1691         unsigned current_nr;
1692         int ret;
1693
1694         closure_init_stack(&cl);
1695
1696         do {
1697                 struct disk_reservation disk_res = { 0, 0 };
1698
1699                 closure_sync(&cl);
1700
1701                 mutex_lock(&c->sb_lock);
1702                 current_nr = ja->nr;
1703
1704                 /*
1705                  * note: journal buckets aren't really counted as _sectors_ used yet, so
1706                  * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
1707                  * when space used goes up without a reservation - but we do need the
1708                  * reservation to ensure we'll actually be able to allocate:
1709                  */
1710
1711                 if (bch2_disk_reservation_get(c, &disk_res,
1712                                 bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
1713                         mutex_unlock(&c->sb_lock);
1714                         return -ENOSPC;
1715                 }
1716
1717                 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
1718
1719                 bch2_disk_reservation_put(c, &disk_res);
1720
1721                 if (ja->nr != current_nr)
1722                         bch2_write_super(c);
1723                 mutex_unlock(&c->sb_lock);
1724         } while (ret == -EAGAIN);
1725
1726         return ret;
1727 }
1728
1729 int bch2_dev_journal_alloc(struct bch_dev *ca)
1730 {
1731         unsigned nr;
1732
1733         if (dynamic_fault("bcachefs:add:journal_alloc"))
1734                 return -ENOMEM;
1735
1736         /*
1737          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
1738          * is smaller:
1739          */
1740         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
1741                      BCH_JOURNAL_BUCKETS_MIN,
1742                      min(1 << 10,
1743                          (1 << 20) / ca->mi.bucket_size));
1744
1745         return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
1746 }
1747
1748 /* Journalling */
1749
1750 /**
1751  * journal_reclaim_fast - do the fast part of journal reclaim
1752  *
1753  * Called from IO submission context, does not block. Cleans up after btree
1754  * write completions by advancing the journal pin and each cache's last_idx,
1755  * kicking off discards and background reclaim as necessary.
1756  */
1757 static void journal_reclaim_fast(struct journal *j)
1758 {
1759         struct journal_entry_pin_list temp;
1760         bool popped = false;
1761
1762         lockdep_assert_held(&j->lock);
1763
1764         /*
1765          * Unpin journal entries whose reference counts reached zero, meaning
1766          * all btree nodes got written out
1767          */
1768         while (!atomic_read(&fifo_peek_front(&j->pin).count)) {
1769                 BUG_ON(!list_empty(&fifo_peek_front(&j->pin).list));
1770                 BUG_ON(!fifo_pop(&j->pin, temp));
1771                 popped = true;
1772         }
1773
1774         if (popped)
1775                 journal_wake(j);
1776 }
1777
1778 /*
1779  * Journal entry pinning - machinery for holding a reference on a given journal
1780  * entry, marking it as dirty:
1781  */
1782
1783 static inline void __journal_pin_add(struct journal *j,
1784                                      struct journal_entry_pin_list *pin_list,
1785                                      struct journal_entry_pin *pin,
1786                                      journal_pin_flush_fn flush_fn)
1787 {
1788         BUG_ON(journal_pin_active(pin));
1789         BUG_ON(!atomic_read(&pin_list->count));
1790
1791         atomic_inc(&pin_list->count);
1792         pin->pin_list   = pin_list;
1793         pin->flush      = flush_fn;
1794
1795         if (flush_fn)
1796                 list_add(&pin->list, &pin_list->list);
1797         else
1798                 INIT_LIST_HEAD(&pin->list);
1799
1800         /*
1801          * If the journal is currently full,  we might want to call flush_fn
1802          * immediately:
1803          */
1804         journal_wake(j);
1805 }
1806
1807 static void journal_pin_add_entry(struct journal *j,
1808                                   struct journal_entry_pin_list *pin_list,
1809                                   struct journal_entry_pin *pin,
1810                                   journal_pin_flush_fn flush_fn)
1811 {
1812         spin_lock(&j->lock);
1813         __journal_pin_add(j, pin_list, pin, flush_fn);
1814         spin_unlock(&j->lock);
1815 }
1816
1817 void bch2_journal_pin_add(struct journal *j,
1818                           struct journal_res *res,
1819                           struct journal_entry_pin *pin,
1820                           journal_pin_flush_fn flush_fn)
1821 {
1822         struct journal_entry_pin_list *pin_list = res->ref
1823                 ? journal_seq_pin(j, res->seq)
1824                 : j->replay_pin_list;
1825
1826         spin_lock(&j->lock);
1827         __journal_pin_add(j, pin_list, pin, flush_fn);
1828         spin_unlock(&j->lock);
1829 }
1830
1831 static inline void __journal_pin_drop(struct journal *j,
1832                                       struct journal_entry_pin *pin)
1833 {
1834         struct journal_entry_pin_list *pin_list = pin->pin_list;
1835
1836         if (!journal_pin_active(pin))
1837                 return;
1838
1839         pin->pin_list = NULL;
1840         list_del_init(&pin->list);
1841
1842         /*
1843          * Unpinning a journal entry make make journal_next_bucket() succeed, if
1844          * writing a new last_seq will now make another bucket available:
1845          */
1846         if (atomic_dec_and_test(&pin_list->count) &&
1847             pin_list == &fifo_peek_front(&j->pin))
1848                 journal_reclaim_fast(j);
1849 }
1850
1851 void bch2_journal_pin_drop(struct journal *j,
1852                           struct journal_entry_pin *pin)
1853 {
1854         spin_lock(&j->lock);
1855         __journal_pin_drop(j, pin);
1856         spin_unlock(&j->lock);
1857 }
1858
1859 void bch2_journal_pin_add_if_older(struct journal *j,
1860                                   struct journal_entry_pin *src_pin,
1861                                   struct journal_entry_pin *pin,
1862                                   journal_pin_flush_fn flush_fn)
1863 {
1864         spin_lock(&j->lock);
1865
1866         if (journal_pin_active(src_pin) &&
1867             (!journal_pin_active(pin) ||
1868              journal_pin_seq(j, src_pin->pin_list) <
1869              journal_pin_seq(j, pin->pin_list))) {
1870                 __journal_pin_drop(j, pin);
1871                 __journal_pin_add(j, src_pin->pin_list, pin, flush_fn);
1872         }
1873
1874         spin_unlock(&j->lock);
1875 }
1876
1877 static struct journal_entry_pin *
1878 __journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
1879 {
1880         struct journal_entry_pin_list *pin_list;
1881         struct journal_entry_pin *ret;
1882         u64 iter;
1883
1884         /* no need to iterate over empty fifo entries: */
1885         journal_reclaim_fast(j);
1886
1887         fifo_for_each_entry_ptr(pin_list, &j->pin, iter) {
1888                 if (iter > seq_to_flush)
1889                         break;
1890
1891                 ret = list_first_entry_or_null(&pin_list->list,
1892                                 struct journal_entry_pin, list);
1893                 if (ret) {
1894                         /* must be list_del_init(), see bch2_journal_pin_drop() */
1895                         list_move(&ret->list, &pin_list->flushed);
1896                         *seq = iter;
1897                         return ret;
1898                 }
1899         }
1900
1901         return NULL;
1902 }
1903
1904 static struct journal_entry_pin *
1905 journal_get_next_pin(struct journal *j, u64 seq_to_flush, u64 *seq)
1906 {
1907         struct journal_entry_pin *ret;
1908
1909         spin_lock(&j->lock);
1910         ret = __journal_get_next_pin(j, seq_to_flush, seq);
1911         spin_unlock(&j->lock);
1912
1913         return ret;
1914 }
1915
1916 static int journal_flush_done(struct journal *j, u64 seq_to_flush,
1917                               struct journal_entry_pin **pin,
1918                               u64 *pin_seq)
1919 {
1920         int ret;
1921
1922         *pin = NULL;
1923
1924         ret = bch2_journal_error(j);
1925         if (ret)
1926                 return ret;
1927
1928         spin_lock(&j->lock);
1929         /*
1930          * If journal replay hasn't completed, the unreplayed journal entries
1931          * hold refs on their corresponding sequence numbers
1932          */
1933         ret = (*pin = __journal_get_next_pin(j, seq_to_flush, pin_seq)) != NULL ||
1934                 !test_bit(JOURNAL_REPLAY_DONE, &j->flags) ||
1935                 journal_last_seq(j) > seq_to_flush ||
1936                 (fifo_used(&j->pin) == 1 &&
1937                  atomic_read(&fifo_peek_front(&j->pin).count) == 1);
1938         spin_unlock(&j->lock);
1939
1940         return ret;
1941 }
1942
1943 int bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
1944 {
1945         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1946         struct journal_entry_pin *pin;
1947         u64 pin_seq;
1948         bool flush;
1949
1950         if (!test_bit(JOURNAL_STARTED, &j->flags))
1951                 return 0;
1952 again:
1953         wait_event(j->wait, journal_flush_done(j, seq_to_flush, &pin, &pin_seq));
1954         if (pin) {
1955                 /* flushing a journal pin might cause a new one to be added: */
1956                 pin->flush(j, pin, pin_seq);
1957                 goto again;
1958         }
1959
1960         spin_lock(&j->lock);
1961         flush = journal_last_seq(j) != j->last_seq_ondisk ||
1962                 (seq_to_flush == U64_MAX && c->btree_roots_dirty);
1963         spin_unlock(&j->lock);
1964
1965         return flush ? bch2_journal_meta(j) : 0;
1966 }
1967
1968 int bch2_journal_flush_all_pins(struct journal *j)
1969 {
1970         return bch2_journal_flush_pins(j, U64_MAX);
1971 }
1972
1973 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
1974 {
1975         bool ret;
1976
1977         spin_lock(&j->lock);
1978         ret = ja->nr &&
1979                 (ja->last_idx != ja->cur_idx &&
1980                  ja->bucket_seq[ja->last_idx] < j->last_seq_ondisk);
1981         spin_unlock(&j->lock);
1982
1983         return ret;
1984 }
1985
1986 /**
1987  * journal_reclaim_work - free up journal buckets
1988  *
1989  * Background journal reclaim writes out btree nodes. It should be run
1990  * early enough so that we never completely run out of journal buckets.
1991  *
1992  * High watermarks for triggering background reclaim:
1993  * - FIFO has fewer than 512 entries left
1994  * - fewer than 25% journal buckets free
1995  *
1996  * Background reclaim runs until low watermarks are reached:
1997  * - FIFO has more than 1024 entries left
1998  * - more than 50% journal buckets free
1999  *
2000  * As long as a reclaim can complete in the time it takes to fill up
2001  * 512 journal entries or 25% of all journal buckets, then
2002  * journal_next_bucket() should not stall.
2003  */
2004 static void journal_reclaim_work(struct work_struct *work)
2005 {
2006         struct bch_fs *c = container_of(to_delayed_work(work),
2007                                 struct bch_fs, journal.reclaim_work);
2008         struct journal *j = &c->journal;
2009         struct bch_dev *ca;
2010         struct journal_entry_pin *pin;
2011         u64 seq, seq_to_flush = 0;
2012         unsigned iter, bucket_to_flush;
2013         unsigned long next_flush;
2014         bool reclaim_lock_held = false, need_flush;
2015
2016         /*
2017          * Advance last_idx to point to the oldest journal entry containing
2018          * btree node updates that have not yet been written out
2019          */
2020         for_each_rw_member(ca, c, iter) {
2021                 struct journal_device *ja = &ca->journal;
2022
2023                 if (!ja->nr)
2024                         continue;
2025
2026                 while (should_discard_bucket(j, ja)) {
2027                         if (!reclaim_lock_held) {
2028                                 /*
2029                                  * ugh:
2030                                  * might be called from __journal_res_get()
2031                                  * under wait_event() - have to go back to
2032                                  * TASK_RUNNING before doing something that
2033                                  * would block, but only if we're doing work:
2034                                  */
2035                                 __set_current_state(TASK_RUNNING);
2036
2037                                 mutex_lock(&j->reclaim_lock);
2038                                 reclaim_lock_held = true;
2039                                 /* recheck under reclaim_lock: */
2040                                 continue;
2041                         }
2042
2043                         if (ca->mi.discard &&
2044                             blk_queue_discard(bdev_get_queue(ca->disk_sb.bdev)))
2045                                 blkdev_issue_discard(ca->disk_sb.bdev,
2046                                         bucket_to_sector(ca,
2047                                                 ja->buckets[ja->last_idx]),
2048                                         ca->mi.bucket_size, GFP_NOIO, 0);
2049
2050                         spin_lock(&j->lock);
2051                         ja->last_idx = (ja->last_idx + 1) % ja->nr;
2052                         spin_unlock(&j->lock);
2053
2054                         journal_wake(j);
2055                 }
2056
2057                 /*
2058                  * Write out enough btree nodes to free up 50% journal
2059                  * buckets
2060                  */
2061                 spin_lock(&j->lock);
2062                 bucket_to_flush = (ja->cur_idx + (ja->nr >> 1)) % ja->nr;
2063                 seq_to_flush = max_t(u64, seq_to_flush,
2064                                      ja->bucket_seq[bucket_to_flush]);
2065                 spin_unlock(&j->lock);
2066         }
2067
2068         if (reclaim_lock_held)
2069                 mutex_unlock(&j->reclaim_lock);
2070
2071         /* Also flush if the pin fifo is more than half full */
2072         spin_lock(&j->lock);
2073         seq_to_flush = max_t(s64, seq_to_flush,
2074                              (s64) journal_cur_seq(j) -
2075                              (j->pin.size >> 1));
2076         spin_unlock(&j->lock);
2077
2078         /*
2079          * If it's been longer than j->reclaim_delay_ms since we last flushed,
2080          * make sure to flush at least one journal pin:
2081          */
2082         next_flush = j->last_flushed + msecs_to_jiffies(j->reclaim_delay_ms);
2083         need_flush = time_after(jiffies, next_flush);
2084
2085         while ((pin = journal_get_next_pin(j, need_flush
2086                                            ? U64_MAX
2087                                            : seq_to_flush, &seq))) {
2088                 __set_current_state(TASK_RUNNING);
2089                 pin->flush(j, pin, seq);
2090                 need_flush = false;
2091
2092                 j->last_flushed = jiffies;
2093         }
2094
2095         if (!test_bit(BCH_FS_RO, &c->flags))
2096                 queue_delayed_work(system_freezable_wq, &j->reclaim_work,
2097                                    msecs_to_jiffies(j->reclaim_delay_ms));
2098 }
2099
2100 /**
2101  * journal_next_bucket - move on to the next journal bucket if possible
2102  */
2103 static int journal_write_alloc(struct journal *j, struct journal_buf *w,
2104                                unsigned sectors)
2105 {
2106         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2107         struct bkey_s_extent e;
2108         struct bch_extent_ptr *ptr;
2109         struct journal_device *ja;
2110         struct bch_dev *ca;
2111         struct dev_alloc_list devs_sorted;
2112         unsigned i, replicas, replicas_want =
2113                 READ_ONCE(c->opts.metadata_replicas);
2114
2115         spin_lock(&j->lock);
2116         e = bkey_i_to_s_extent(&j->key);
2117
2118         /*
2119          * Drop any pointers to devices that have been removed, are no longer
2120          * empty, or filled up their current journal bucket:
2121          *
2122          * Note that a device may have had a small amount of free space (perhaps
2123          * one sector) that wasn't enough for the smallest possible journal
2124          * entry - that's why we drop pointers to devices <= current free space,
2125          * i.e. whichever device was limiting the current journal entry size.
2126          */
2127         extent_for_each_ptr_backwards(e, ptr) {
2128                    ca = bch_dev_bkey_exists(c, ptr->dev);
2129
2130                 if (ca->mi.state != BCH_MEMBER_STATE_RW ||
2131                     ca->journal.sectors_free <= sectors)
2132                         __bch2_extent_drop_ptr(e, ptr);
2133                 else
2134                         ca->journal.sectors_free -= sectors;
2135         }
2136
2137         replicas = bch2_extent_nr_ptrs(e.c);
2138
2139         rcu_read_lock();
2140         devs_sorted = bch2_wp_alloc_list(c, &j->wp,
2141                                          &c->rw_devs[BCH_DATA_JOURNAL]);
2142
2143         for (i = 0; i < devs_sorted.nr; i++) {
2144                 ca = rcu_dereference(c->devs[devs_sorted.devs[i]]);
2145                 if (!ca)
2146                         continue;
2147
2148                 if (!ca->mi.durability)
2149                         continue;
2150
2151                 ja = &ca->journal;
2152                 if (!ja->nr)
2153                         continue;
2154
2155                 if (replicas >= replicas_want)
2156                         break;
2157
2158                 /*
2159                  * Check that we can use this device, and aren't already using
2160                  * it:
2161                  */
2162                 if (bch2_extent_has_device(e.c, ca->dev_idx) ||
2163                     !journal_dev_buckets_available(j, ca) ||
2164                     sectors > ca->mi.bucket_size)
2165                         continue;
2166
2167                 j->wp.next_alloc[ca->dev_idx] += U32_MAX;
2168                 bch2_wp_rescale(c, ca, &j->wp);
2169
2170                 ja->sectors_free = ca->mi.bucket_size - sectors;
2171                 ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
2172                 ja->bucket_seq[ja->cur_idx] = le64_to_cpu(w->data->seq);
2173
2174                 extent_ptr_append(bkey_i_to_extent(&j->key),
2175                         (struct bch_extent_ptr) {
2176                                   .offset = bucket_to_sector(ca,
2177                                         ja->buckets[ja->cur_idx]),
2178                                   .dev = ca->dev_idx,
2179                 });
2180
2181                 replicas += ca->mi.durability;
2182         }
2183         rcu_read_unlock();
2184
2185         j->prev_buf_sectors = 0;
2186
2187         bkey_copy(&w->key, &j->key);
2188         spin_unlock(&j->lock);
2189
2190         if (replicas < c->opts.metadata_replicas_required)
2191                 return -EROFS;
2192
2193         BUG_ON(!replicas);
2194
2195         return 0;
2196 }
2197
2198 static void journal_write_compact(struct jset *jset)
2199 {
2200         struct jset_entry *i, *next, *prev = NULL;
2201
2202         /*
2203          * Simple compaction, dropping empty jset_entries (from journal
2204          * reservations that weren't fully used) and merging jset_entries that
2205          * can be.
2206          *
2207          * If we wanted to be really fancy here, we could sort all the keys in
2208          * the jset and drop keys that were overwritten - probably not worth it:
2209          */
2210         vstruct_for_each_safe(jset, i, next) {
2211                 unsigned u64s = le16_to_cpu(i->u64s);
2212
2213                 /* Empty entry: */
2214                 if (!u64s)
2215                         continue;
2216
2217                 /* Can we merge with previous entry? */
2218                 if (prev &&
2219                     i->btree_id == prev->btree_id &&
2220                     i->level    == prev->level &&
2221                     i->type     == prev->type &&
2222                     i->type     == JOURNAL_ENTRY_BTREE_KEYS &&
2223                     le16_to_cpu(prev->u64s) + u64s <= U16_MAX) {
2224                         memmove_u64s_down(vstruct_next(prev),
2225                                           i->_data,
2226                                           u64s);
2227                         le16_add_cpu(&prev->u64s, u64s);
2228                         continue;
2229                 }
2230
2231                 /* Couldn't merge, move i into new position (after prev): */
2232                 prev = prev ? vstruct_next(prev) : jset->start;
2233                 if (i != prev)
2234                         memmove_u64s_down(prev, i, jset_u64s(u64s));
2235         }
2236
2237         prev = prev ? vstruct_next(prev) : jset->start;
2238         jset->u64s = cpu_to_le32((u64 *) prev - jset->_data);
2239 }
2240
2241 static void journal_buf_realloc(struct journal *j, struct journal_buf *buf)
2242 {
2243         /* we aren't holding j->lock: */
2244         unsigned new_size = READ_ONCE(j->buf_size_want);
2245         void *new_buf;
2246
2247         if (buf->size >= new_size)
2248                 return;
2249
2250         new_buf = kvpmalloc(new_size, GFP_NOIO|__GFP_NOWARN);
2251         if (!new_buf)
2252                 return;
2253
2254         memcpy(new_buf, buf->data, buf->size);
2255         kvpfree(buf->data, buf->size);
2256         buf->data       = new_buf;
2257         buf->size       = new_size;
2258 }
2259
2260 static void journal_write_done(struct closure *cl)
2261 {
2262         struct journal *j = container_of(cl, struct journal, io);
2263         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2264         struct journal_buf *w = journal_prev_buf(j);
2265         struct bch_devs_list devs =
2266                 bch2_extent_devs(bkey_i_to_s_c_extent(&w->key));
2267
2268         if (!devs.nr) {
2269                 bch_err(c, "unable to write journal to sufficient devices");
2270                 goto err;
2271         }
2272
2273         if (bch2_mark_replicas(c, BCH_DATA_JOURNAL, devs))
2274                 goto err;
2275 out:
2276         __bch2_time_stats_update(j->write_time, j->write_start_time);
2277
2278         spin_lock(&j->lock);
2279         j->last_seq_ondisk = le64_to_cpu(w->data->last_seq);
2280
2281         journal_seq_pin(j, le64_to_cpu(w->data->seq))->devs = devs;
2282
2283         /*
2284          * Updating last_seq_ondisk may let journal_reclaim_work() discard more
2285          * buckets:
2286          *
2287          * Must come before signaling write completion, for
2288          * bch2_fs_journal_stop():
2289          */
2290         mod_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
2291
2292         /* also must come before signalling write completion: */
2293         closure_debug_destroy(cl);
2294
2295         BUG_ON(!j->reservations.prev_buf_unwritten);
2296         atomic64_sub(((union journal_res_state) { .prev_buf_unwritten = 1 }).v,
2297                      &j->reservations.counter);
2298
2299         closure_wake_up(&w->wait);
2300         journal_wake(j);
2301
2302         if (test_bit(JOURNAL_NEED_WRITE, &j->flags))
2303                 mod_delayed_work(system_freezable_wq, &j->write_work, 0);
2304         spin_unlock(&j->lock);
2305         return;
2306 err:
2307         bch2_fatal_error(c);
2308         bch2_journal_halt(j);
2309         goto out;
2310 }
2311
2312 static void journal_write_endio(struct bio *bio)
2313 {
2314         struct bch_dev *ca = bio->bi_private;
2315         struct journal *j = &ca->fs->journal;
2316
2317         if (bch2_dev_io_err_on(bio->bi_status, ca, "journal write") ||
2318             bch2_meta_write_fault("journal")) {
2319                 struct journal_buf *w = journal_prev_buf(j);
2320                 unsigned long flags;
2321
2322                 spin_lock_irqsave(&j->err_lock, flags);
2323                 bch2_extent_drop_device(bkey_i_to_s_extent(&w->key), ca->dev_idx);
2324                 spin_unlock_irqrestore(&j->err_lock, flags);
2325         }
2326
2327         closure_put(&j->io);
2328         percpu_ref_put(&ca->io_ref);
2329 }
2330
2331 static void journal_write(struct closure *cl)
2332 {
2333         struct journal *j = container_of(cl, struct journal, io);
2334         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2335         struct bch_dev *ca;
2336         struct journal_buf *w = journal_prev_buf(j);
2337         struct jset *jset;
2338         struct bio *bio;
2339         struct bch_extent_ptr *ptr;
2340         unsigned i, sectors, bytes;
2341
2342         journal_buf_realloc(j, w);
2343         jset = w->data;
2344
2345         j->write_start_time = local_clock();
2346         mutex_lock(&c->btree_root_lock);
2347         for (i = 0; i < BTREE_ID_NR; i++) {
2348                 struct btree_root *r = &c->btree_roots[i];
2349
2350                 if (r->alive)
2351                         bch2_journal_add_btree_root(w, i, &r->key, r->level);
2352         }
2353         c->btree_roots_dirty = false;
2354         mutex_unlock(&c->btree_root_lock);
2355
2356         journal_write_compact(jset);
2357
2358         jset->read_clock        = cpu_to_le16(c->bucket_clock[READ].hand);
2359         jset->write_clock       = cpu_to_le16(c->bucket_clock[WRITE].hand);
2360         jset->magic             = cpu_to_le64(jset_magic(c));
2361         jset->version           = cpu_to_le32(BCACHE_JSET_VERSION);
2362
2363         SET_JSET_BIG_ENDIAN(jset, CPU_BIG_ENDIAN);
2364         SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
2365
2366         if (bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
2367             journal_entry_validate_entries(c, jset, WRITE))
2368                 goto err;
2369
2370         bch2_encrypt(c, JSET_CSUM_TYPE(jset), journal_nonce(jset),
2371                     jset->encrypted_start,
2372                     vstruct_end(jset) - (void *) jset->encrypted_start);
2373
2374         jset->csum = csum_vstruct(c, JSET_CSUM_TYPE(jset),
2375                                   journal_nonce(jset), jset);
2376
2377         if (!bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)) &&
2378             journal_entry_validate_entries(c, jset, WRITE))
2379                 goto err;
2380
2381         sectors = vstruct_sectors(jset, c->block_bits);
2382         BUG_ON(sectors > j->prev_buf_sectors);
2383
2384         bytes = vstruct_bytes(w->data);
2385         memset((void *) w->data + bytes, 0, (sectors << 9) - bytes);
2386
2387         if (journal_write_alloc(j, w, sectors)) {
2388                 bch2_journal_halt(j);
2389                 bch_err(c, "Unable to allocate journal write");
2390                 bch2_fatal_error(c);
2391                 continue_at(cl, journal_write_done, system_highpri_wq);
2392         }
2393
2394         /*
2395          * XXX: we really should just disable the entire journal in nochanges
2396          * mode
2397          */
2398         if (c->opts.nochanges)
2399                 goto no_io;
2400
2401         extent_for_each_ptr(bkey_i_to_s_extent(&w->key), ptr) {
2402                 ca = bch_dev_bkey_exists(c, ptr->dev);
2403                 if (!percpu_ref_tryget(&ca->io_ref)) {
2404                         /* XXX: fix this */
2405                         bch_err(c, "missing device for journal write\n");
2406                         continue;
2407                 }
2408
2409                 this_cpu_add(ca->io_done->sectors[WRITE][BCH_DATA_JOURNAL],
2410                              sectors);
2411
2412                 bio = ca->journal.bio;
2413                 bio_reset(bio);
2414                 bio_set_dev(bio, ca->disk_sb.bdev);
2415                 bio->bi_iter.bi_sector  = ptr->offset;
2416                 bio->bi_iter.bi_size    = sectors << 9;
2417                 bio->bi_end_io          = journal_write_endio;
2418                 bio->bi_private         = ca;
2419                 bio_set_op_attrs(bio, REQ_OP_WRITE,
2420                                  REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
2421                 bch2_bio_map(bio, jset);
2422
2423                 trace_journal_write(bio);
2424                 closure_bio_submit(bio, cl);
2425
2426                 ca->journal.bucket_seq[ca->journal.cur_idx] = le64_to_cpu(w->data->seq);
2427         }
2428
2429         for_each_rw_member(ca, c, i)
2430                 if (journal_flushes_device(ca) &&
2431                     !bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), i)) {
2432                         percpu_ref_get(&ca->io_ref);
2433
2434                         bio = ca->journal.bio;
2435                         bio_reset(bio);
2436                         bio_set_dev(bio, ca->disk_sb.bdev);
2437                         bio->bi_opf             = REQ_OP_FLUSH;
2438                         bio->bi_end_io          = journal_write_endio;
2439                         bio->bi_private         = ca;
2440                         closure_bio_submit(bio, cl);
2441                 }
2442
2443 no_io:
2444         extent_for_each_ptr(bkey_i_to_s_extent(&j->key), ptr)
2445                 ptr->offset += sectors;
2446
2447         continue_at(cl, journal_write_done, system_highpri_wq);
2448 err:
2449         bch2_inconsistent_error(c);
2450         continue_at(cl, journal_write_done, system_highpri_wq);
2451 }
2452
2453 /*
2454  * returns true if there's nothing to flush and no journal write still in flight
2455  */
2456 static bool journal_flush_write(struct journal *j)
2457 {
2458         bool ret;
2459
2460         spin_lock(&j->lock);
2461         ret = !j->reservations.prev_buf_unwritten;
2462
2463         if (!journal_entry_is_open(j)) {
2464                 spin_unlock(&j->lock);
2465                 return ret;
2466         }
2467
2468         set_bit(JOURNAL_NEED_WRITE, &j->flags);
2469         if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
2470                 ret = false;
2471         else
2472                 spin_unlock(&j->lock);
2473         return ret;
2474 }
2475
2476 static void journal_write_work(struct work_struct *work)
2477 {
2478         struct journal *j = container_of(work, struct journal, write_work.work);
2479
2480         journal_flush_write(j);
2481 }
2482
2483 /*
2484  * Given an inode number, if that inode number has data in the journal that
2485  * hasn't yet been flushed, return the journal sequence number that needs to be
2486  * flushed:
2487  */
2488 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
2489 {
2490         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
2491         u64 seq = 0;
2492
2493         if (!test_bit(h, j->buf[0].has_inode) &&
2494             !test_bit(h, j->buf[1].has_inode))
2495                 return 0;
2496
2497         spin_lock(&j->lock);
2498         if (test_bit(h, journal_cur_buf(j)->has_inode))
2499                 seq = journal_cur_seq(j);
2500         else if (test_bit(h, journal_prev_buf(j)->has_inode))
2501                 seq = journal_cur_seq(j) - 1;
2502         spin_unlock(&j->lock);
2503
2504         return seq;
2505 }
2506
2507 static int __journal_res_get(struct journal *j, struct journal_res *res,
2508                               unsigned u64s_min, unsigned u64s_max)
2509 {
2510         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2511         struct journal_buf *buf;
2512         int ret;
2513 retry:
2514         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2515         if (ret)
2516                 return ret;
2517
2518         spin_lock(&j->lock);
2519         /*
2520          * Recheck after taking the lock, so we don't race with another thread
2521          * that just did journal_entry_open() and call journal_entry_close()
2522          * unnecessarily
2523          */
2524         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
2525         if (ret) {
2526                 spin_unlock(&j->lock);
2527                 return 1;
2528         }
2529
2530         /*
2531          * If we couldn't get a reservation because the current buf filled up,
2532          * and we had room for a bigger entry on disk, signal that we want to
2533          * realloc the journal bufs:
2534          */
2535         buf = journal_cur_buf(j);
2536         if (journal_entry_is_open(j) &&
2537             buf->size >> 9 < buf->disk_sectors &&
2538             buf->size < JOURNAL_ENTRY_SIZE_MAX)
2539                 j->buf_size_want = max(j->buf_size_want, buf->size << 1);
2540
2541         /*
2542          * Close the current journal entry if necessary, then try to start a new
2543          * one:
2544          */
2545         switch (journal_buf_switch(j, false)) {
2546         case JOURNAL_ENTRY_ERROR:
2547                 spin_unlock(&j->lock);
2548                 return -EROFS;
2549         case JOURNAL_ENTRY_INUSE:
2550                 /* haven't finished writing out the previous one: */
2551                 spin_unlock(&j->lock);
2552                 trace_journal_entry_full(c);
2553                 goto blocked;
2554         case JOURNAL_ENTRY_CLOSED:
2555                 break;
2556         case JOURNAL_UNLOCKED:
2557                 goto retry;
2558         }
2559
2560         /* We now have a new, closed journal buf - see if we can open it: */
2561         ret = journal_entry_open(j);
2562         spin_unlock(&j->lock);
2563
2564         if (ret < 0)
2565                 return ret;
2566         if (ret)
2567                 goto retry;
2568
2569         /* Journal's full, we have to wait */
2570
2571         /*
2572          * Direct reclaim - can't rely on reclaim from work item
2573          * due to freezing..
2574          */
2575         journal_reclaim_work(&j->reclaim_work.work);
2576
2577         trace_journal_full(c);
2578 blocked:
2579         if (!j->res_get_blocked_start)
2580                 j->res_get_blocked_start = local_clock() ?: 1;
2581         return 0;
2582 }
2583
2584 /*
2585  * Essentially the entry function to the journaling code. When bcachefs is doing
2586  * a btree insert, it calls this function to get the current journal write.
2587  * Journal write is the structure used set up journal writes. The calling
2588  * function will then add its keys to the structure, queuing them for the next
2589  * write.
2590  *
2591  * To ensure forward progress, the current task must not be holding any
2592  * btree node write locks.
2593  */
2594 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
2595                                  unsigned u64s_min, unsigned u64s_max)
2596 {
2597         int ret;
2598
2599         wait_event(j->wait,
2600                    (ret = __journal_res_get(j, res, u64s_min,
2601                                             u64s_max)));
2602         return ret < 0 ? ret : 0;
2603 }
2604
2605 u64 bch2_journal_last_unwritten_seq(struct journal *j)
2606 {
2607         u64 seq;
2608
2609         spin_lock(&j->lock);
2610         seq = journal_cur_seq(j);
2611         if (j->reservations.prev_buf_unwritten)
2612                 seq--;
2613         spin_unlock(&j->lock);
2614
2615         return seq;
2616 }
2617
2618 int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
2619 {
2620         int ret;
2621
2622         spin_lock(&j->lock);
2623         BUG_ON(seq > journal_cur_seq(j));
2624
2625         if (seq < journal_cur_seq(j) ||
2626             journal_entry_is_open(j)) {
2627                 spin_unlock(&j->lock);
2628                 return 1;
2629         }
2630
2631         ret = journal_entry_open(j);
2632         if (!ret)
2633                 closure_wait(&j->async_wait, parent);
2634         spin_unlock(&j->lock);
2635
2636         if (!ret)
2637                 journal_reclaim_work(&j->reclaim_work.work);
2638
2639         return ret;
2640 }
2641
2642 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
2643 {
2644         spin_lock(&j->lock);
2645
2646         BUG_ON(seq > journal_cur_seq(j));
2647
2648         if (bch2_journal_error(j)) {
2649                 spin_unlock(&j->lock);
2650                 return;
2651         }
2652
2653         if (seq == journal_cur_seq(j)) {
2654                 if (!closure_wait(&journal_cur_buf(j)->wait, parent))
2655                         BUG();
2656         } else if (seq + 1 == journal_cur_seq(j) &&
2657                    j->reservations.prev_buf_unwritten) {
2658                 if (!closure_wait(&journal_prev_buf(j)->wait, parent))
2659                         BUG();
2660
2661                 smp_mb();
2662
2663                 /* check if raced with write completion (or failure) */
2664                 if (!j->reservations.prev_buf_unwritten ||
2665                     bch2_journal_error(j))
2666                         closure_wake_up(&journal_prev_buf(j)->wait);
2667         }
2668
2669         spin_unlock(&j->lock);
2670 }
2671
2672 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
2673 {
2674         struct journal_buf *buf;
2675
2676         spin_lock(&j->lock);
2677
2678         BUG_ON(seq > journal_cur_seq(j));
2679
2680         if (bch2_journal_error(j)) {
2681                 spin_unlock(&j->lock);
2682                 return;
2683         }
2684
2685         if (seq == journal_cur_seq(j)) {
2686                 bool set_need_write = false;
2687
2688                 buf = journal_cur_buf(j);
2689
2690                 if (parent && !closure_wait(&buf->wait, parent))
2691                         BUG();
2692
2693                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2694                         j->need_write_time = local_clock();
2695                         set_need_write = true;
2696                 }
2697
2698                 switch (journal_buf_switch(j, set_need_write)) {
2699                 case JOURNAL_ENTRY_ERROR:
2700                         if (parent)
2701                                 closure_wake_up(&buf->wait);
2702                         break;
2703                 case JOURNAL_ENTRY_CLOSED:
2704                         /*
2705                          * Journal entry hasn't been opened yet, but caller
2706                          * claims it has something
2707                          */
2708                         BUG();
2709                 case JOURNAL_ENTRY_INUSE:
2710                         break;
2711                 case JOURNAL_UNLOCKED:
2712                         return;
2713                 }
2714         } else if (parent &&
2715                    seq + 1 == journal_cur_seq(j) &&
2716                    j->reservations.prev_buf_unwritten) {
2717                 buf = journal_prev_buf(j);
2718
2719                 if (!closure_wait(&buf->wait, parent))
2720                         BUG();
2721
2722                 smp_mb();
2723
2724                 /* check if raced with write completion (or failure) */
2725                 if (!j->reservations.prev_buf_unwritten ||
2726                     bch2_journal_error(j))
2727                         closure_wake_up(&buf->wait);
2728         }
2729
2730         spin_unlock(&j->lock);
2731 }
2732
2733 static int journal_seq_flushed(struct journal *j, u64 seq)
2734 {
2735         struct journal_buf *buf;
2736         int ret = 1;
2737
2738         spin_lock(&j->lock);
2739         BUG_ON(seq > journal_cur_seq(j));
2740
2741         if (seq == journal_cur_seq(j)) {
2742                 bool set_need_write = false;
2743
2744                 ret = 0;
2745
2746                 buf = journal_cur_buf(j);
2747
2748                 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
2749                         j->need_write_time = local_clock();
2750                         set_need_write = true;
2751                 }
2752
2753                 switch (journal_buf_switch(j, set_need_write)) {
2754                 case JOURNAL_ENTRY_ERROR:
2755                         ret = -EIO;
2756                         break;
2757                 case JOURNAL_ENTRY_CLOSED:
2758                         /*
2759                          * Journal entry hasn't been opened yet, but caller
2760                          * claims it has something
2761                          */
2762                         BUG();
2763                 case JOURNAL_ENTRY_INUSE:
2764                         break;
2765                 case JOURNAL_UNLOCKED:
2766                         return 0;
2767                 }
2768         } else if (seq + 1 == journal_cur_seq(j) &&
2769                    j->reservations.prev_buf_unwritten) {
2770                 ret = bch2_journal_error(j);
2771         }
2772
2773         spin_unlock(&j->lock);
2774
2775         return ret;
2776 }
2777
2778 int bch2_journal_flush_seq(struct journal *j, u64 seq)
2779 {
2780         u64 start_time = local_clock();
2781         int ret, ret2;
2782
2783         ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
2784
2785         bch2_time_stats_update(j->flush_seq_time, start_time);
2786
2787         return ret ?: ret2 < 0 ? ret2 : 0;
2788 }
2789
2790 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
2791 {
2792         struct journal_res res;
2793         unsigned u64s = jset_u64s(0);
2794
2795         memset(&res, 0, sizeof(res));
2796
2797         bch2_journal_res_get(j, &res, u64s, u64s);
2798         bch2_journal_res_put(j, &res);
2799
2800         bch2_journal_flush_seq_async(j, res.seq, parent);
2801 }
2802
2803 int bch2_journal_meta(struct journal *j)
2804 {
2805         struct journal_res res;
2806         unsigned u64s = jset_u64s(0);
2807         int ret;
2808
2809         memset(&res, 0, sizeof(res));
2810
2811         ret = bch2_journal_res_get(j, &res, u64s, u64s);
2812         if (ret)
2813                 return ret;
2814
2815         bch2_journal_res_put(j, &res);
2816
2817         return bch2_journal_flush_seq(j, res.seq);
2818 }
2819
2820 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
2821 {
2822         u64 seq, journal_seq;
2823
2824         spin_lock(&j->lock);
2825         journal_seq = journal_cur_seq(j);
2826
2827         if (journal_entry_is_open(j)) {
2828                 seq = journal_seq;
2829         } else if (journal_seq) {
2830                 seq = journal_seq - 1;
2831         } else {
2832                 spin_unlock(&j->lock);
2833                 return;
2834         }
2835         spin_unlock(&j->lock);
2836
2837         bch2_journal_flush_seq_async(j, seq, parent);
2838 }
2839
2840 int bch2_journal_flush(struct journal *j)
2841 {
2842         u64 seq, journal_seq;
2843
2844         spin_lock(&j->lock);
2845         journal_seq = journal_cur_seq(j);
2846
2847         if (journal_entry_is_open(j)) {
2848                 seq = journal_seq;
2849         } else if (journal_seq) {
2850                 seq = journal_seq - 1;
2851         } else {
2852                 spin_unlock(&j->lock);
2853                 return 0;
2854         }
2855         spin_unlock(&j->lock);
2856
2857         return bch2_journal_flush_seq(j, seq);
2858 }
2859
2860 int bch2_journal_flush_device(struct journal *j, int dev_idx)
2861 {
2862         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2863         struct journal_entry_pin_list *p;
2864         struct bch_devs_list devs;
2865         u64 iter, seq = 0;
2866         int ret = 0;
2867
2868         spin_lock(&j->lock);
2869         fifo_for_each_entry_ptr(p, &j->pin, iter)
2870                 if (dev_idx >= 0
2871                     ? bch2_dev_list_has_dev(p->devs, dev_idx)
2872                     : p->devs.nr < c->opts.metadata_replicas)
2873                         seq = iter;
2874         spin_unlock(&j->lock);
2875
2876         ret = bch2_journal_flush_pins(j, seq);
2877         if (ret)
2878                 return ret;
2879
2880         mutex_lock(&c->replicas_gc_lock);
2881         bch2_replicas_gc_start(c, 1 << BCH_DATA_JOURNAL);
2882
2883         seq = 0;
2884
2885         spin_lock(&j->lock);
2886         while (!ret && seq < j->pin.back) {
2887                 seq = max(seq, journal_last_seq(j));
2888                 devs = journal_seq_pin(j, seq)->devs;
2889                 seq++;
2890
2891                 spin_unlock(&j->lock);
2892                 ret = bch2_mark_replicas(c, BCH_DATA_JOURNAL, devs);
2893                 spin_lock(&j->lock);
2894         }
2895         spin_unlock(&j->lock);
2896
2897         bch2_replicas_gc_end(c, ret);
2898         mutex_unlock(&c->replicas_gc_lock);
2899
2900         return ret;
2901 }
2902
2903 /* startup/shutdown: */
2904
2905 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
2906 {
2907         union journal_res_state state;
2908         struct journal_buf *w;
2909         bool ret;
2910
2911         spin_lock(&j->lock);
2912         state = READ_ONCE(j->reservations);
2913         w = j->buf + !state.idx;
2914
2915         ret = state.prev_buf_unwritten &&
2916                 bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx);
2917         spin_unlock(&j->lock);
2918
2919         return ret;
2920 }
2921
2922 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
2923 {
2924         spin_lock(&j->lock);
2925         bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
2926         spin_unlock(&j->lock);
2927
2928         wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
2929 }
2930
2931 void bch2_fs_journal_stop(struct journal *j)
2932 {
2933         wait_event(j->wait, journal_flush_write(j));
2934
2935         cancel_delayed_work_sync(&j->write_work);
2936         cancel_delayed_work_sync(&j->reclaim_work);
2937 }
2938
2939 void bch2_dev_journal_exit(struct bch_dev *ca)
2940 {
2941         kfree(ca->journal.bio);
2942         kfree(ca->journal.buckets);
2943         kfree(ca->journal.bucket_seq);
2944
2945         ca->journal.bio         = NULL;
2946         ca->journal.buckets     = NULL;
2947         ca->journal.bucket_seq  = NULL;
2948 }
2949
2950 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
2951 {
2952         struct journal_device *ja = &ca->journal;
2953         struct bch_sb_field_journal *journal_buckets =
2954                 bch2_sb_get_journal(sb);
2955         unsigned i;
2956
2957         ja->nr = bch2_nr_journal_buckets(journal_buckets);
2958
2959         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2960         if (!ja->bucket_seq)
2961                 return -ENOMEM;
2962
2963         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
2964                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
2965         if (!ca->journal.bio)
2966                 return -ENOMEM;
2967
2968         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
2969         if (!ja->buckets)
2970                 return -ENOMEM;
2971
2972         for (i = 0; i < ja->nr; i++)
2973                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
2974
2975         return 0;
2976 }
2977
2978 void bch2_fs_journal_exit(struct journal *j)
2979 {
2980         kvpfree(j->buf[1].data, j->buf[1].size);
2981         kvpfree(j->buf[0].data, j->buf[0].size);
2982         free_fifo(&j->pin);
2983 }
2984
2985 int bch2_fs_journal_init(struct journal *j)
2986 {
2987         struct bch_fs *c = container_of(j, struct bch_fs, journal);
2988         static struct lock_class_key res_key;
2989         int ret = 0;
2990
2991         pr_verbose_init(c->opts, "");
2992
2993         spin_lock_init(&j->lock);
2994         spin_lock_init(&j->err_lock);
2995         init_waitqueue_head(&j->wait);
2996         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
2997         INIT_DELAYED_WORK(&j->reclaim_work, journal_reclaim_work);
2998         mutex_init(&j->blacklist_lock);
2999         INIT_LIST_HEAD(&j->seq_blacklist);
3000         mutex_init(&j->reclaim_lock);
3001
3002         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
3003
3004         j->buf[0].size          = JOURNAL_ENTRY_SIZE_MIN;
3005         j->buf[1].size          = JOURNAL_ENTRY_SIZE_MIN;
3006         j->write_delay_ms       = 1000;
3007         j->reclaim_delay_ms     = 100;
3008
3009         bkey_extent_init(&j->key);
3010
3011         atomic64_set(&j->reservations.counter,
3012                 ((union journal_res_state)
3013                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
3014
3015         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
3016             !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
3017             !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) {
3018                 ret = -ENOMEM;
3019                 goto out;
3020         }
3021
3022         j->pin.front = j->pin.back = 1;
3023 out:
3024         pr_verbose_init(c->opts, "ret %i", ret);
3025         return ret;
3026 }
3027
3028 /* debug: */
3029
3030 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
3031 {
3032         struct bch_fs *c = container_of(j, struct bch_fs, journal);
3033         union journal_res_state *s = &j->reservations;
3034         struct bch_dev *ca;
3035         unsigned iter;
3036         ssize_t ret = 0;
3037
3038         rcu_read_lock();
3039         spin_lock(&j->lock);
3040
3041         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3042                          "active journal entries:\t%llu\n"
3043                          "seq:\t\t\t%llu\n"
3044                          "last_seq:\t\t%llu\n"
3045                          "last_seq_ondisk:\t%llu\n"
3046                          "reservation count:\t%u\n"
3047                          "reservation offset:\t%u\n"
3048                          "current entry u64s:\t%u\n"
3049                          "io in flight:\t\t%i\n"
3050                          "need write:\t\t%i\n"
3051                          "dirty:\t\t\t%i\n"
3052                          "replay done:\t\t%i\n",
3053                          fifo_used(&j->pin),
3054                          journal_cur_seq(j),
3055                          journal_last_seq(j),
3056                          j->last_seq_ondisk,
3057                          journal_state_count(*s, s->idx),
3058                          s->cur_entry_offset,
3059                          j->cur_entry_u64s,
3060                          s->prev_buf_unwritten,
3061                          test_bit(JOURNAL_NEED_WRITE,   &j->flags),
3062                          journal_entry_is_open(j),
3063                          test_bit(JOURNAL_REPLAY_DONE,  &j->flags));
3064
3065         for_each_member_device_rcu(ca, c, iter,
3066                                    &c->rw_devs[BCH_DATA_JOURNAL]) {
3067                 struct journal_device *ja = &ca->journal;
3068
3069                 if (!ja->nr)
3070                         continue;
3071
3072                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3073                                  "dev %u:\n"
3074                                  "\tnr\t\t%u\n"
3075                                  "\tcur_idx\t\t%u (seq %llu)\n"
3076                                  "\tlast_idx\t%u (seq %llu)\n",
3077                                  iter, ja->nr,
3078                                  ja->cur_idx,   ja->bucket_seq[ja->cur_idx],
3079                                  ja->last_idx,  ja->bucket_seq[ja->last_idx]);
3080         }
3081
3082         spin_unlock(&j->lock);
3083         rcu_read_unlock();
3084
3085         return ret;
3086 }
3087
3088 ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
3089 {
3090         struct journal_entry_pin_list *pin_list;
3091         struct journal_entry_pin *pin;
3092         ssize_t ret = 0;
3093         u64 i;
3094
3095         spin_lock(&j->lock);
3096         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
3097                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3098                                  "%llu: count %u\n",
3099                                  i, atomic_read(&pin_list->count));
3100
3101                 list_for_each_entry(pin, &pin_list->list, list)
3102                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3103                                          "\t%p %pf\n",
3104                                          pin, pin->flush);
3105
3106                 if (!list_empty(&pin_list->flushed))
3107                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3108                                          "flushed:\n");
3109
3110                 list_for_each_entry(pin, &pin_list->flushed, list)
3111                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
3112                                          "\t%p %pf\n",
3113                                          pin, pin->flush);
3114         }
3115         spin_unlock(&j->lock);
3116
3117         return ret;
3118 }