]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
5874a9ff2204fc0244fa108825ad1a19d3e8d788
[bcachefs-tools-debian] / libbcachefs / journal.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * bcachefs journalling code, for btree insertions
4  *
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcachefs.h"
9 #include "alloc_foreground.h"
10 #include "bkey_methods.h"
11 #include "btree_gc.h"
12 #include "buckets.h"
13 #include "journal.h"
14 #include "journal_io.h"
15 #include "journal_reclaim.h"
16 #include "journal_seq_blacklist.h"
17 #include "super-io.h"
18
19 #include <trace/events/bcachefs.h>
20
21 static u64 last_unwritten_seq(struct journal *j)
22 {
23         union journal_res_state s = READ_ONCE(j->reservations);
24
25         lockdep_assert_held(&j->lock);
26
27         return journal_cur_seq(j) - s.prev_buf_unwritten;
28 }
29
30 static inline bool journal_seq_unwritten(struct journal *j, u64 seq)
31 {
32         return seq >= last_unwritten_seq(j);
33 }
34
35 static bool __journal_entry_is_open(union journal_res_state state)
36 {
37         return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
38 }
39
40 static bool journal_entry_is_open(struct journal *j)
41 {
42         return __journal_entry_is_open(j->reservations);
43 }
44
45 static inline struct journal_buf *
46 journal_seq_to_buf(struct journal *j, u64 seq)
47 {
48         struct journal_buf *buf = NULL;
49
50         EBUG_ON(seq > journal_cur_seq(j));
51         EBUG_ON(seq == journal_cur_seq(j) &&
52                 j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
53
54         if (journal_seq_unwritten(j, seq)) {
55                 buf = j->buf + (seq & 1);
56                 EBUG_ON(le64_to_cpu(buf->data->seq) != seq);
57         }
58         return buf;
59 }
60
61 static void journal_pin_new_entry(struct journal *j, int count)
62 {
63         struct journal_entry_pin_list *p;
64
65         /*
66          * The fifo_push() needs to happen at the same time as j->seq is
67          * incremented for journal_last_seq() to be calculated correctly
68          */
69         atomic64_inc(&j->seq);
70         p = fifo_push_ref(&j->pin);
71
72         INIT_LIST_HEAD(&p->list);
73         INIT_LIST_HEAD(&p->flushed);
74         atomic_set(&p->count, count);
75         p->devs.nr = 0;
76 }
77
78 static void bch2_journal_buf_init(struct journal *j)
79 {
80         struct journal_buf *buf = journal_cur_buf(j);
81
82         bkey_extent_init(&buf->key);
83
84         memset(buf->has_inode, 0, sizeof(buf->has_inode));
85
86         memset(buf->data, 0, sizeof(*buf->data));
87         buf->data->seq  = cpu_to_le64(journal_cur_seq(j));
88         buf->data->u64s = 0;
89 }
90
91 void bch2_journal_halt(struct journal *j)
92 {
93         union journal_res_state old, new;
94         u64 v = atomic64_read(&j->reservations.counter);
95
96         do {
97                 old.v = new.v = v;
98                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
99                         return;
100
101                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
102         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
103                                        old.v, new.v)) != old.v);
104
105         j->err_seq = journal_cur_seq(j);
106         journal_wake(j);
107         closure_wake_up(&journal_cur_buf(j)->wait);
108 }
109
110 /* journal entry close/open: */
111
112 void __bch2_journal_buf_put(struct journal *j, bool need_write_just_set)
113 {
114         if (!need_write_just_set &&
115             test_bit(JOURNAL_NEED_WRITE, &j->flags))
116                 bch2_time_stats_update(j->delay_time,
117                                        j->need_write_time);
118
119         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
120
121         closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
122 }
123
124 /*
125  * Returns true if journal entry is now closed:
126  */
127 static bool __journal_entry_close(struct journal *j)
128 {
129         struct bch_fs *c = container_of(j, struct bch_fs, journal);
130         struct journal_buf *buf = journal_cur_buf(j);
131         union journal_res_state old, new;
132         u64 v = atomic64_read(&j->reservations.counter);
133         bool set_need_write = false;
134         unsigned sectors;
135
136         lockdep_assert_held(&j->lock);
137
138         do {
139                 old.v = new.v = v;
140                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
141                         return true;
142
143                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
144                         /* this entry will never be written: */
145                         closure_wake_up(&buf->wait);
146                         return true;
147                 }
148
149                 if (!test_bit(JOURNAL_NEED_WRITE, &j->flags)) {
150                         set_bit(JOURNAL_NEED_WRITE, &j->flags);
151                         j->need_write_time = local_clock();
152                         set_need_write = true;
153                 }
154
155                 if (new.prev_buf_unwritten)
156                         return false;
157
158                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
159                 new.idx++;
160                 new.prev_buf_unwritten = 1;
161
162                 BUG_ON(journal_state_count(new, new.idx));
163         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
164                                        old.v, new.v)) != old.v);
165
166         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
167
168         sectors = vstruct_blocks_plus(buf->data, c->block_bits,
169                                       buf->u64s_reserved) << c->block_bits;
170         BUG_ON(sectors > buf->sectors);
171         buf->sectors = sectors;
172
173         /*
174          * We have to set last_seq here, _before_ opening a new journal entry:
175          *
176          * A threads may replace an old pin with a new pin on their current
177          * journal reservation - the expectation being that the journal will
178          * contain either what the old pin protected or what the new pin
179          * protects.
180          *
181          * After the old pin is dropped journal_last_seq() won't include the old
182          * pin, so we can only write the updated last_seq on the entry that
183          * contains whatever the new pin protects.
184          *
185          * Restated, we can _not_ update last_seq for a given entry if there
186          * could be a newer entry open with reservations/pins that have been
187          * taken against it.
188          *
189          * Hence, we want update/set last_seq on the current journal entry right
190          * before we open a new one:
191          */
192         buf->data->last_seq     = cpu_to_le64(journal_last_seq(j));
193
194         journal_pin_new_entry(j, 1);
195
196         bch2_journal_buf_init(j);
197
198         cancel_delayed_work(&j->write_work);
199
200         bch2_journal_space_available(j);
201
202         bch2_journal_buf_put(j, old.idx, set_need_write);
203         return true;
204 }
205
206 static bool journal_entry_close(struct journal *j)
207 {
208         bool ret;
209
210         spin_lock(&j->lock);
211         ret = __journal_entry_close(j);
212         spin_unlock(&j->lock);
213
214         return ret;
215 }
216
217 /*
218  * should _only_ called from journal_res_get() - when we actually want a
219  * journal reservation - journal entry is open means journal is dirty:
220  *
221  * returns:
222  * 0:           success
223  * -ENOSPC:     journal currently full, must invoke reclaim
224  * -EAGAIN:     journal blocked, must wait
225  * -EROFS:      insufficient rw devices or journal error
226  */
227 static int journal_entry_open(struct journal *j)
228 {
229         struct bch_fs *c = container_of(j, struct bch_fs, journal);
230         struct journal_buf *buf = journal_cur_buf(j);
231         union journal_res_state old, new;
232         int u64s;
233         u64 v;
234
235         BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb));
236
237         lockdep_assert_held(&j->lock);
238         BUG_ON(journal_entry_is_open(j));
239
240         if (j->blocked)
241                 return cur_entry_blocked;
242
243         if (j->cur_entry_error)
244                 return j->cur_entry_error;
245
246         BUG_ON(!j->cur_entry_sectors);
247
248         buf->u64s_reserved      = j->entry_u64s_reserved;
249         buf->disk_sectors       = j->cur_entry_sectors;
250         buf->sectors            = min(buf->disk_sectors, buf->buf_size >> 9);
251
252         u64s = (int) (buf->sectors << 9) / sizeof(u64) -
253                 journal_entry_overhead(j);
254         u64s  = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1);
255
256         if (u64s <= le32_to_cpu(buf->data->u64s))
257                 return cur_entry_journal_full;
258
259         /*
260          * Must be set before marking the journal entry as open:
261          */
262         j->cur_entry_u64s = u64s;
263
264         v = atomic64_read(&j->reservations.counter);
265         do {
266                 old.v = new.v = v;
267
268                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
269                         return cur_entry_insufficient_devices;
270
271                 /* Handle any already added entries */
272                 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
273
274                 EBUG_ON(journal_state_count(new, new.idx));
275                 journal_state_inc(&new);
276         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
277                                        old.v, new.v)) != old.v);
278
279         if (j->res_get_blocked_start)
280                 bch2_time_stats_update(j->blocked_time,
281                                        j->res_get_blocked_start);
282         j->res_get_blocked_start = 0;
283
284         mod_delayed_work(system_freezable_wq,
285                          &j->write_work,
286                          msecs_to_jiffies(j->write_delay_ms));
287         journal_wake(j);
288         return 0;
289 }
290
291 static bool journal_quiesced(struct journal *j)
292 {
293         union journal_res_state state = READ_ONCE(j->reservations);
294         bool ret = !state.prev_buf_unwritten && !__journal_entry_is_open(state);
295
296         if (!ret)
297                 journal_entry_close(j);
298         return ret;
299 }
300
301 static void journal_quiesce(struct journal *j)
302 {
303         wait_event(j->wait, journal_quiesced(j));
304 }
305
306 static void journal_write_work(struct work_struct *work)
307 {
308         struct journal *j = container_of(work, struct journal, write_work.work);
309
310         journal_entry_close(j);
311 }
312
313 /*
314  * Given an inode number, if that inode number has data in the journal that
315  * hasn't yet been flushed, return the journal sequence number that needs to be
316  * flushed:
317  */
318 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
319 {
320         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
321         u64 seq = 0;
322
323         if (!test_bit(h, j->buf[0].has_inode) &&
324             !test_bit(h, j->buf[1].has_inode))
325                 return 0;
326
327         spin_lock(&j->lock);
328         if (test_bit(h, journal_cur_buf(j)->has_inode))
329                 seq = journal_cur_seq(j);
330         else if (test_bit(h, journal_prev_buf(j)->has_inode))
331                 seq = journal_cur_seq(j) - 1;
332         spin_unlock(&j->lock);
333
334         return seq;
335 }
336
337 void bch2_journal_set_has_inum(struct journal *j, u64 inode, u64 seq)
338 {
339         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
340         struct journal_buf *buf;
341
342         spin_lock(&j->lock);
343
344         if ((buf = journal_seq_to_buf(j, seq)))
345                 set_bit(h, buf->has_inode);
346
347         spin_unlock(&j->lock);
348 }
349
350 static int __journal_res_get(struct journal *j, struct journal_res *res,
351                              unsigned flags)
352 {
353         struct bch_fs *c = container_of(j, struct bch_fs, journal);
354         struct journal_buf *buf;
355         bool can_discard;
356         int ret;
357 retry:
358         if (journal_res_get_fast(j, res, flags))
359                 return 0;
360
361         if (bch2_journal_error(j))
362                 return -EROFS;
363
364         spin_lock(&j->lock);
365
366         /*
367          * Recheck after taking the lock, so we don't race with another thread
368          * that just did journal_entry_open() and call journal_entry_close()
369          * unnecessarily
370          */
371         if (journal_res_get_fast(j, res, flags)) {
372                 spin_unlock(&j->lock);
373                 return 0;
374         }
375
376         if (!(flags & JOURNAL_RES_GET_RESERVED) &&
377             !test_bit(JOURNAL_MAY_GET_UNRESERVED, &j->flags)) {
378                 /*
379                  * Don't want to close current journal entry, just need to
380                  * invoke reclaim:
381                  */
382                 ret = cur_entry_journal_full;
383                 goto unlock;
384         }
385
386         /*
387          * If we couldn't get a reservation because the current buf filled up,
388          * and we had room for a bigger entry on disk, signal that we want to
389          * realloc the journal bufs:
390          */
391         buf = journal_cur_buf(j);
392         if (journal_entry_is_open(j) &&
393             buf->buf_size >> 9 < buf->disk_sectors &&
394             buf->buf_size < JOURNAL_ENTRY_SIZE_MAX)
395                 j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1);
396
397         if (journal_entry_is_open(j) &&
398             !__journal_entry_close(j)) {
399                 /*
400                  * We failed to get a reservation on the current open journal
401                  * entry because it's full, and we can't close it because
402                  * there's still a previous one in flight:
403                  */
404                 trace_journal_entry_full(c);
405                 ret = cur_entry_blocked;
406         } else {
407                 ret = journal_entry_open(j);
408         }
409 unlock:
410         if ((ret && ret != cur_entry_insufficient_devices) &&
411             !j->res_get_blocked_start) {
412                 j->res_get_blocked_start = local_clock() ?: 1;
413                 trace_journal_full(c);
414         }
415
416         can_discard = j->can_discard;
417         spin_unlock(&j->lock);
418
419         if (!ret)
420                 goto retry;
421
422         if (WARN_ONCE(ret == cur_entry_journal_full &&
423                       !can_discard &&
424                       (flags & JOURNAL_RES_GET_RESERVED),
425                       "JOURNAL_RES_GET_RESERVED set but journal full")) {
426                 char *buf;
427
428                 buf = kmalloc(4096, GFP_NOFS);
429                 if (buf) {
430                         bch2_journal_debug_to_text(&_PBUF(buf, 4096), j);
431                         pr_err("\n%s", buf);
432                         kfree(buf);
433                 }
434         }
435
436         /*
437          * Journal is full - can't rely on reclaim from work item due to
438          * freezing:
439          */
440         if ((ret == cur_entry_journal_full ||
441              ret == cur_entry_journal_pin_full) &&
442             !(flags & JOURNAL_RES_GET_NONBLOCK)) {
443                 if (can_discard) {
444                         bch2_journal_do_discards(j);
445                         goto retry;
446                 }
447
448                 if (mutex_trylock(&j->reclaim_lock)) {
449                         bch2_journal_reclaim(j);
450                         mutex_unlock(&j->reclaim_lock);
451                 }
452         }
453
454         return ret == cur_entry_insufficient_devices ? -EROFS : -EAGAIN;
455 }
456
457 /*
458  * Essentially the entry function to the journaling code. When bcachefs is doing
459  * a btree insert, it calls this function to get the current journal write.
460  * Journal write is the structure used set up journal writes. The calling
461  * function will then add its keys to the structure, queuing them for the next
462  * write.
463  *
464  * To ensure forward progress, the current task must not be holding any
465  * btree node write locks.
466  */
467 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
468                                   unsigned flags)
469 {
470         int ret;
471
472         closure_wait_event(&j->async_wait,
473                    (ret = __journal_res_get(j, res, flags)) != -EAGAIN ||
474                    (flags & JOURNAL_RES_GET_NONBLOCK));
475         return ret;
476 }
477
478 /* journal_preres: */
479
480 static bool journal_preres_available(struct journal *j,
481                                      struct journal_preres *res,
482                                      unsigned new_u64s,
483                                      unsigned flags)
484 {
485         bool ret = bch2_journal_preres_get_fast(j, res, new_u64s, flags);
486
487         if (!ret && mutex_trylock(&j->reclaim_lock)) {
488                 bch2_journal_reclaim(j);
489                 mutex_unlock(&j->reclaim_lock);
490         }
491
492         return ret;
493 }
494
495 int __bch2_journal_preres_get(struct journal *j,
496                               struct journal_preres *res,
497                               unsigned new_u64s,
498                               unsigned flags)
499 {
500         int ret;
501
502         closure_wait_event(&j->preres_wait,
503                    (ret = bch2_journal_error(j)) ||
504                    journal_preres_available(j, res, new_u64s, flags));
505         return ret;
506 }
507
508 /* journal_entry_res: */
509
510 void bch2_journal_entry_res_resize(struct journal *j,
511                                    struct journal_entry_res *res,
512                                    unsigned new_u64s)
513 {
514         union journal_res_state state;
515         int d = new_u64s - res->u64s;
516
517         spin_lock(&j->lock);
518
519         j->entry_u64s_reserved += d;
520         if (d <= 0)
521                 goto out;
522
523         j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d);
524         smp_mb();
525         state = READ_ONCE(j->reservations);
526
527         if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL &&
528             state.cur_entry_offset > j->cur_entry_u64s) {
529                 j->cur_entry_u64s += d;
530                 /*
531                  * Not enough room in current journal entry, have to flush it:
532                  */
533                 __journal_entry_close(j);
534         } else {
535                 journal_cur_buf(j)->u64s_reserved += d;
536         }
537 out:
538         spin_unlock(&j->lock);
539         res->u64s += d;
540 }
541
542 /* journal flushing: */
543
544 /**
545  * bch2_journal_flush_seq_async - wait for a journal entry to be written
546  *
547  * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
548  * necessary
549  */
550 int bch2_journal_flush_seq_async(struct journal *j, u64 seq,
551                                  struct closure *parent)
552 {
553         struct journal_buf *buf;
554         int ret = 0;
555
556         if (seq <= j->err_seq)
557                 return -EIO;
558
559         if (seq <= j->seq_ondisk)
560                 return 1;
561
562         spin_lock(&j->lock);
563
564         /* Recheck under lock: */
565         if (seq <= j->err_seq) {
566                 ret = -EIO;
567                 goto out;
568         }
569
570         if (seq <= j->seq_ondisk) {
571                 ret = 1;
572                 goto out;
573         }
574
575         if (parent &&
576             (buf = journal_seq_to_buf(j, seq)))
577                 if (!closure_wait(&buf->wait, parent))
578                         BUG();
579
580         if (seq == journal_cur_seq(j))
581                 __journal_entry_close(j);
582 out:
583         spin_unlock(&j->lock);
584         return ret;
585 }
586
587 int bch2_journal_flush_seq(struct journal *j, u64 seq)
588 {
589         u64 start_time = local_clock();
590         int ret, ret2;
591
592         ret = wait_event_killable(j->wait, (ret2 = bch2_journal_flush_seq_async(j, seq, NULL)));
593
594         bch2_time_stats_update(j->flush_seq_time, start_time);
595
596         return ret ?: ret2 < 0 ? ret2 : 0;
597 }
598
599 int bch2_journal_meta(struct journal *j)
600 {
601         struct journal_res res;
602         int ret;
603
604         memset(&res, 0, sizeof(res));
605
606         ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
607         if (ret)
608                 return ret;
609
610         bch2_journal_res_put(j, &res);
611
612         return bch2_journal_flush_seq(j, res.seq);
613 }
614
615 /*
616  * bch2_journal_flush_async - if there is an open journal entry, or a journal
617  * still being written, write it and wait for the write to complete
618  */
619 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
620 {
621         u64 seq, journal_seq;
622
623         spin_lock(&j->lock);
624         journal_seq = journal_cur_seq(j);
625
626         if (journal_entry_is_open(j)) {
627                 seq = journal_seq;
628         } else if (journal_seq) {
629                 seq = journal_seq - 1;
630         } else {
631                 spin_unlock(&j->lock);
632                 return;
633         }
634         spin_unlock(&j->lock);
635
636         bch2_journal_flush_seq_async(j, seq, parent);
637 }
638
639 int bch2_journal_flush(struct journal *j)
640 {
641         u64 seq, journal_seq;
642
643         spin_lock(&j->lock);
644         journal_seq = journal_cur_seq(j);
645
646         if (journal_entry_is_open(j)) {
647                 seq = journal_seq;
648         } else if (journal_seq) {
649                 seq = journal_seq - 1;
650         } else {
651                 spin_unlock(&j->lock);
652                 return 0;
653         }
654         spin_unlock(&j->lock);
655
656         return bch2_journal_flush_seq(j, seq);
657 }
658
659 /* block/unlock the journal: */
660
661 void bch2_journal_unblock(struct journal *j)
662 {
663         spin_lock(&j->lock);
664         j->blocked--;
665         spin_unlock(&j->lock);
666
667         journal_wake(j);
668 }
669
670 void bch2_journal_block(struct journal *j)
671 {
672         spin_lock(&j->lock);
673         j->blocked++;
674         spin_unlock(&j->lock);
675
676         journal_quiesce(j);
677 }
678
679 /* allocate journal on a device: */
680
681 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
682                                          bool new_fs, struct closure *cl)
683 {
684         struct bch_fs *c = ca->fs;
685         struct journal_device *ja = &ca->journal;
686         struct bch_sb_field_journal *journal_buckets;
687         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
688         int ret = 0;
689
690         /* don't handle reducing nr of buckets yet: */
691         if (nr <= ja->nr)
692                 return 0;
693
694         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
695         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
696         if (!new_buckets || !new_bucket_seq) {
697                 ret = -ENOMEM;
698                 goto err;
699         }
700
701         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
702                                         nr + sizeof(*journal_buckets) / sizeof(u64));
703         if (!journal_buckets) {
704                 ret = -ENOSPC;
705                 goto err;
706         }
707
708         /*
709          * We may be called from the device add path, before the new device has
710          * actually been added to the running filesystem:
711          */
712         if (c)
713                 spin_lock(&c->journal.lock);
714
715         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
716         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
717         swap(new_buckets,       ja->buckets);
718         swap(new_bucket_seq,    ja->bucket_seq);
719
720         if (c)
721                 spin_unlock(&c->journal.lock);
722
723         while (ja->nr < nr) {
724                 struct open_bucket *ob = NULL;
725                 unsigned pos;
726                 long bucket;
727
728                 if (new_fs) {
729                         bucket = bch2_bucket_alloc_new_fs(ca);
730                         if (bucket < 0) {
731                                 ret = -ENOSPC;
732                                 goto err;
733                         }
734                 } else {
735                         rcu_read_lock();
736                         ob = bch2_bucket_alloc(c, ca, RESERVE_ALLOC,
737                                                false, cl);
738                         rcu_read_unlock();
739                         if (IS_ERR(ob)) {
740                                 ret = cl ? -EAGAIN : -ENOSPC;
741                                 goto err;
742                         }
743
744                         bucket = sector_to_bucket(ca, ob->ptr.offset);
745                 }
746
747                 if (c) {
748                         percpu_down_read(&c->mark_lock);
749                         spin_lock(&c->journal.lock);
750                 }
751
752                 /*
753                  * XXX
754                  * For resize at runtime, we should be writing the new
755                  * superblock before inserting into the journal array
756                  */
757
758                 pos = ja->nr ? (ja->cur_idx + 1) % ja->nr : 0;
759                 __array_insert_item(ja->buckets,                ja->nr, pos);
760                 __array_insert_item(ja->bucket_seq,             ja->nr, pos);
761                 __array_insert_item(journal_buckets->buckets,   ja->nr, pos);
762                 ja->nr++;
763
764                 ja->buckets[pos] = bucket;
765                 ja->bucket_seq[pos] = 0;
766                 journal_buckets->buckets[pos] = cpu_to_le64(bucket);
767
768                 if (pos <= ja->discard_idx)
769                         ja->discard_idx = (ja->discard_idx + 1) % ja->nr;
770                 if (pos <= ja->dirty_idx_ondisk)
771                         ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr;
772                 if (pos <= ja->dirty_idx)
773                         ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr;
774                 if (pos <= ja->cur_idx)
775                         ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
776
777                 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_journal,
778                                           ca->mi.bucket_size,
779                                           gc_phase(GC_PHASE_SB),
780                                           0);
781
782                 if (c) {
783                         spin_unlock(&c->journal.lock);
784                         percpu_up_read(&c->mark_lock);
785                 }
786
787                 if (!new_fs)
788                         bch2_open_bucket_put(c, ob);
789         }
790 err:
791         bch2_sb_resize_journal(&ca->disk_sb,
792                 ja->nr + sizeof(*journal_buckets) / sizeof(u64));
793         kfree(new_bucket_seq);
794         kfree(new_buckets);
795
796         return ret;
797 }
798
799 /*
800  * Allocate more journal space at runtime - not currently making use if it, but
801  * the code works:
802  */
803 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
804                                 unsigned nr)
805 {
806         struct journal_device *ja = &ca->journal;
807         struct closure cl;
808         unsigned current_nr;
809         int ret;
810
811         closure_init_stack(&cl);
812
813         do {
814                 struct disk_reservation disk_res = { 0, 0 };
815
816                 closure_sync(&cl);
817
818                 mutex_lock(&c->sb_lock);
819                 current_nr = ja->nr;
820
821                 /*
822                  * note: journal buckets aren't really counted as _sectors_ used yet, so
823                  * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
824                  * when space used goes up without a reservation - but we do need the
825                  * reservation to ensure we'll actually be able to allocate:
826                  */
827
828                 if (bch2_disk_reservation_get(c, &disk_res,
829                                               bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
830                         mutex_unlock(&c->sb_lock);
831                         return -ENOSPC;
832                 }
833
834                 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
835
836                 bch2_disk_reservation_put(c, &disk_res);
837
838                 if (ja->nr != current_nr)
839                         bch2_write_super(c);
840                 mutex_unlock(&c->sb_lock);
841         } while (ret == -EAGAIN);
842
843         return ret;
844 }
845
846 int bch2_dev_journal_alloc(struct bch_dev *ca)
847 {
848         unsigned nr;
849
850         if (dynamic_fault("bcachefs:add:journal_alloc"))
851                 return -ENOMEM;
852
853         /*
854          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
855          * is smaller:
856          */
857         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
858                      BCH_JOURNAL_BUCKETS_MIN,
859                      min(1 << 10,
860                          (1 << 20) / ca->mi.bucket_size));
861
862         return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
863 }
864
865 /* startup/shutdown: */
866
867 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
868 {
869         union journal_res_state state;
870         struct journal_buf *w;
871         bool ret;
872
873         spin_lock(&j->lock);
874         state = READ_ONCE(j->reservations);
875         w = j->buf + !state.idx;
876
877         ret = state.prev_buf_unwritten &&
878                 bch2_bkey_has_device(bkey_i_to_s_c(&w->key), dev_idx);
879         spin_unlock(&j->lock);
880
881         return ret;
882 }
883
884 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
885 {
886         wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
887 }
888
889 void bch2_fs_journal_stop(struct journal *j)
890 {
891         bch2_journal_flush_all_pins(j);
892
893         wait_event(j->wait, journal_entry_close(j));
894
895         /*
896          * Always write a new journal entry, to make sure the clock hands are up
897          * to date (and match the superblock)
898          */
899         bch2_journal_meta(j);
900
901         journal_quiesce(j);
902
903         BUG_ON(!bch2_journal_error(j) &&
904                (journal_entry_is_open(j) ||
905                 j->last_empty_seq + 1 != journal_cur_seq(j)));
906
907         cancel_delayed_work_sync(&j->write_work);
908         bch2_journal_reclaim_stop(j);
909 }
910
911 int bch2_fs_journal_start(struct journal *j, u64 cur_seq,
912                           struct list_head *journal_entries)
913 {
914         struct bch_fs *c = container_of(j, struct bch_fs, journal);
915         struct journal_entry_pin_list *p;
916         struct journal_replay *i;
917         u64 last_seq = cur_seq, nr, seq;
918
919         if (!list_empty(journal_entries))
920                 last_seq = le64_to_cpu(list_last_entry(journal_entries,
921                                 struct journal_replay, list)->j.last_seq);
922
923         nr = cur_seq - last_seq;
924
925         if (nr + 1 > j->pin.size) {
926                 free_fifo(&j->pin);
927                 init_fifo(&j->pin, roundup_pow_of_two(nr + 1), GFP_KERNEL);
928                 if (!j->pin.data) {
929                         bch_err(c, "error reallocating journal fifo (%llu open entries)", nr);
930                         return -ENOMEM;
931                 }
932         }
933
934         j->replay_journal_seq   = last_seq;
935         j->replay_journal_seq_end = cur_seq;
936         j->last_seq_ondisk      = last_seq;
937         j->pin.front            = last_seq;
938         j->pin.back             = cur_seq;
939         atomic64_set(&j->seq, cur_seq - 1);
940
941         fifo_for_each_entry_ptr(p, &j->pin, seq) {
942                 INIT_LIST_HEAD(&p->list);
943                 INIT_LIST_HEAD(&p->flushed);
944                 atomic_set(&p->count, 1);
945                 p->devs.nr = 0;
946         }
947
948         list_for_each_entry(i, journal_entries, list) {
949                 seq = le64_to_cpu(i->j.seq);
950                 BUG_ON(seq >= cur_seq);
951
952                 if (seq < last_seq)
953                         continue;
954
955                 journal_seq_pin(j, seq)->devs = i->devs;
956         }
957
958         spin_lock(&j->lock);
959
960         set_bit(JOURNAL_STARTED, &j->flags);
961
962         journal_pin_new_entry(j, 1);
963
964         j->reservations.idx = journal_cur_seq(j);
965
966         bch2_journal_buf_init(j);
967
968         c->last_bucket_seq_cleanup = journal_cur_seq(j);
969
970         bch2_journal_space_available(j);
971         spin_unlock(&j->lock);
972
973         return 0;
974 }
975
976 /* init/exit: */
977
978 void bch2_dev_journal_exit(struct bch_dev *ca)
979 {
980         kfree(ca->journal.bio);
981         kfree(ca->journal.buckets);
982         kfree(ca->journal.bucket_seq);
983
984         ca->journal.bio         = NULL;
985         ca->journal.buckets     = NULL;
986         ca->journal.bucket_seq  = NULL;
987 }
988
989 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
990 {
991         struct journal_device *ja = &ca->journal;
992         struct bch_sb_field_journal *journal_buckets =
993                 bch2_sb_get_journal(sb);
994         unsigned i;
995
996         ja->nr = bch2_nr_journal_buckets(journal_buckets);
997
998         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
999         if (!ja->bucket_seq)
1000                 return -ENOMEM;
1001
1002         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
1003                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
1004         if (!ca->journal.bio)
1005                 return -ENOMEM;
1006
1007         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1008         if (!ja->buckets)
1009                 return -ENOMEM;
1010
1011         for (i = 0; i < ja->nr; i++)
1012                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
1013
1014         return 0;
1015 }
1016
1017 void bch2_fs_journal_exit(struct journal *j)
1018 {
1019         kvpfree(j->buf[1].data, j->buf[1].buf_size);
1020         kvpfree(j->buf[0].data, j->buf[0].buf_size);
1021         free_fifo(&j->pin);
1022 }
1023
1024 int bch2_fs_journal_init(struct journal *j)
1025 {
1026         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1027         static struct lock_class_key res_key;
1028         int ret = 0;
1029
1030         pr_verbose_init(c->opts, "");
1031
1032         spin_lock_init(&j->lock);
1033         spin_lock_init(&j->err_lock);
1034         init_waitqueue_head(&j->wait);
1035         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
1036         init_waitqueue_head(&j->pin_flush_wait);
1037         mutex_init(&j->reclaim_lock);
1038         mutex_init(&j->discard_lock);
1039
1040         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
1041
1042         j->buf[0].buf_size      = JOURNAL_ENTRY_SIZE_MIN;
1043         j->buf[1].buf_size      = JOURNAL_ENTRY_SIZE_MIN;
1044         j->write_delay_ms       = 1000;
1045         j->reclaim_delay_ms     = 100;
1046
1047         /* Btree roots: */
1048         j->entry_u64s_reserved +=
1049                 BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1050
1051         atomic64_set(&j->reservations.counter,
1052                 ((union journal_res_state)
1053                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
1054
1055         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
1056             !(j->buf[0].data = kvpmalloc(j->buf[0].buf_size, GFP_KERNEL)) ||
1057             !(j->buf[1].data = kvpmalloc(j->buf[1].buf_size, GFP_KERNEL))) {
1058                 ret = -ENOMEM;
1059                 goto out;
1060         }
1061
1062         j->pin.front = j->pin.back = 1;
1063 out:
1064         pr_verbose_init(c->opts, "ret %i", ret);
1065         return ret;
1066 }
1067
1068 /* debug: */
1069
1070 void bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
1071 {
1072         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1073         union journal_res_state s;
1074         struct bch_dev *ca;
1075         unsigned iter;
1076
1077         rcu_read_lock();
1078         spin_lock(&j->lock);
1079         s = READ_ONCE(j->reservations);
1080
1081         pr_buf(out,
1082                "active journal entries:\t%llu\n"
1083                "seq:\t\t\t%llu\n"
1084                "last_seq:\t\t%llu\n"
1085                "last_seq_ondisk:\t%llu\n"
1086                "prereserved:\t\t%u/%u\n"
1087                "nr direct reclaim:\t%llu\n"
1088                "nr background reclaim:\t%llu\n"
1089                "current entry sectors:\t%u\n"
1090                "current entry error:\t%u\n"
1091                "current entry:\t\t",
1092                fifo_used(&j->pin),
1093                journal_cur_seq(j),
1094                journal_last_seq(j),
1095                j->last_seq_ondisk,
1096                j->prereserved.reserved,
1097                j->prereserved.remaining,
1098                j->nr_direct_reclaim,
1099                j->nr_background_reclaim,
1100                j->cur_entry_sectors,
1101                j->cur_entry_error);
1102
1103         switch (s.cur_entry_offset) {
1104         case JOURNAL_ENTRY_ERROR_VAL:
1105                 pr_buf(out, "error\n");
1106                 break;
1107         case JOURNAL_ENTRY_CLOSED_VAL:
1108                 pr_buf(out, "closed\n");
1109                 break;
1110         default:
1111                 pr_buf(out, "%u/%u\n",
1112                        s.cur_entry_offset,
1113                        j->cur_entry_u64s);
1114                 break;
1115         }
1116
1117         pr_buf(out,
1118                "current entry refs:\t%u\n"
1119                "prev entry unwritten:\t",
1120                journal_state_count(s, s.idx));
1121
1122         if (s.prev_buf_unwritten)
1123                 pr_buf(out, "yes, ref %u sectors %u\n",
1124                        journal_state_count(s, !s.idx),
1125                        journal_prev_buf(j)->sectors);
1126         else
1127                 pr_buf(out, "no\n");
1128
1129         pr_buf(out,
1130                "need write:\t\t%i\n"
1131                "replay done:\t\t%i\n",
1132                test_bit(JOURNAL_NEED_WRITE,     &j->flags),
1133                test_bit(JOURNAL_REPLAY_DONE,    &j->flags));
1134
1135         for_each_member_device_rcu(ca, c, iter,
1136                                    &c->rw_devs[BCH_DATA_journal]) {
1137                 struct journal_device *ja = &ca->journal;
1138
1139                 if (!ja->nr)
1140                         continue;
1141
1142                 pr_buf(out,
1143                        "dev %u:\n"
1144                        "\tnr\t\t%u\n"
1145                        "\tavailable\t%u:%u\n"
1146                        "\tdiscard_idx\t\t%u\n"
1147                        "\tdirty_idx_ondisk\t%u (seq %llu)\n"
1148                        "\tdirty_idx\t\t%u (seq %llu)\n"
1149                        "\tcur_idx\t\t%u (seq %llu)\n",
1150                        iter, ja->nr,
1151                        bch2_journal_dev_buckets_available(j, ja, journal_space_discarded),
1152                        ja->sectors_free,
1153                        ja->discard_idx,
1154                        ja->dirty_idx_ondisk,    ja->bucket_seq[ja->dirty_idx_ondisk],
1155                        ja->dirty_idx,           ja->bucket_seq[ja->dirty_idx],
1156                        ja->cur_idx,             ja->bucket_seq[ja->cur_idx]);
1157         }
1158
1159         spin_unlock(&j->lock);
1160         rcu_read_unlock();
1161 }
1162
1163 void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j)
1164 {
1165         struct journal_entry_pin_list *pin_list;
1166         struct journal_entry_pin *pin;
1167         u64 i;
1168
1169         spin_lock(&j->lock);
1170         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
1171                 pr_buf(out, "%llu: count %u\n",
1172                        i, atomic_read(&pin_list->count));
1173
1174                 list_for_each_entry(pin, &pin_list->list, list)
1175                         pr_buf(out, "\t%px %ps\n",
1176                                pin, pin->flush);
1177
1178                 if (!list_empty(&pin_list->flushed))
1179                         pr_buf(out, "flushed:\n");
1180
1181                 list_for_each_entry(pin, &pin_list->flushed, list)
1182                         pr_buf(out, "\t%px %ps\n",
1183                                pin, pin->flush);
1184         }
1185         spin_unlock(&j->lock);
1186 }