]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
Update bcachefs sources to 0010403265 bcachefs: Fix spurious alloc errors on forced...
[bcachefs-tools-debian] / libbcachefs / journal.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * bcachefs journalling code, for btree insertions
4  *
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcachefs.h"
9 #include "alloc_foreground.h"
10 #include "bkey_methods.h"
11 #include "btree_gc.h"
12 #include "buckets.h"
13 #include "journal.h"
14 #include "journal_io.h"
15 #include "journal_reclaim.h"
16 #include "journal_seq_blacklist.h"
17 #include "super-io.h"
18
19 #include <trace/events/bcachefs.h>
20
21 static u64 last_unwritten_seq(struct journal *j)
22 {
23         union journal_res_state s = READ_ONCE(j->reservations);
24
25         lockdep_assert_held(&j->lock);
26
27         return journal_cur_seq(j) - s.prev_buf_unwritten;
28 }
29
30 static inline bool journal_seq_unwritten(struct journal *j, u64 seq)
31 {
32         return seq >= last_unwritten_seq(j);
33 }
34
35 static bool __journal_entry_is_open(union journal_res_state state)
36 {
37         return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
38 }
39
40 static bool journal_entry_is_open(struct journal *j)
41 {
42         return __journal_entry_is_open(j->reservations);
43 }
44
45 static inline struct journal_buf *
46 journal_seq_to_buf(struct journal *j, u64 seq)
47 {
48         struct journal_buf *buf = NULL;
49
50         EBUG_ON(seq > journal_cur_seq(j));
51         EBUG_ON(seq == journal_cur_seq(j) &&
52                 j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
53
54         if (journal_seq_unwritten(j, seq)) {
55                 buf = j->buf + (seq & 1);
56                 EBUG_ON(le64_to_cpu(buf->data->seq) != seq);
57         }
58         return buf;
59 }
60
61 static void journal_pin_new_entry(struct journal *j, int count)
62 {
63         struct journal_entry_pin_list *p;
64
65         /*
66          * The fifo_push() needs to happen at the same time as j->seq is
67          * incremented for journal_last_seq() to be calculated correctly
68          */
69         atomic64_inc(&j->seq);
70         p = fifo_push_ref(&j->pin);
71
72         INIT_LIST_HEAD(&p->list);
73         INIT_LIST_HEAD(&p->flushed);
74         atomic_set(&p->count, count);
75         p->devs.nr = 0;
76 }
77
78 static void bch2_journal_buf_init(struct journal *j)
79 {
80         struct journal_buf *buf = journal_cur_buf(j);
81
82         bkey_extent_init(&buf->key);
83
84         memset(buf->has_inode, 0, sizeof(buf->has_inode));
85
86         memset(buf->data, 0, sizeof(*buf->data));
87         buf->data->seq  = cpu_to_le64(journal_cur_seq(j));
88         buf->data->u64s = 0;
89 }
90
91 void bch2_journal_halt(struct journal *j)
92 {
93         union journal_res_state old, new;
94         u64 v = atomic64_read(&j->reservations.counter);
95
96         do {
97                 old.v = new.v = v;
98                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
99                         return;
100
101                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
102         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
103                                        old.v, new.v)) != old.v);
104
105         j->err_seq = journal_cur_seq(j);
106         journal_wake(j);
107         closure_wake_up(&journal_cur_buf(j)->wait);
108 }
109
110 /* journal entry close/open: */
111
112 void __bch2_journal_buf_put(struct journal *j, bool need_write_just_set)
113 {
114         if (!need_write_just_set &&
115             test_bit(JOURNAL_NEED_WRITE, &j->flags))
116                 bch2_time_stats_update(j->delay_time,
117                                        j->need_write_time);
118
119         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
120
121         closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
122 }
123
124 /*
125  * Returns true if journal entry is now closed:
126  */
127 static bool __journal_entry_close(struct journal *j)
128 {
129         struct bch_fs *c = container_of(j, struct bch_fs, journal);
130         struct journal_buf *buf = journal_cur_buf(j);
131         union journal_res_state old, new;
132         u64 v = atomic64_read(&j->reservations.counter);
133         bool set_need_write = false;
134         unsigned sectors;
135
136         lockdep_assert_held(&j->lock);
137
138         do {
139                 old.v = new.v = v;
140                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
141                         return true;
142
143                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
144                         /* this entry will never be written: */
145                         closure_wake_up(&buf->wait);
146                         return true;
147                 }
148
149                 if (!test_bit(JOURNAL_NEED_WRITE, &j->flags)) {
150                         set_bit(JOURNAL_NEED_WRITE, &j->flags);
151                         j->need_write_time = local_clock();
152                         set_need_write = true;
153                 }
154
155                 if (new.prev_buf_unwritten)
156                         return false;
157
158                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
159                 new.idx++;
160                 new.prev_buf_unwritten = 1;
161
162                 BUG_ON(journal_state_count(new, new.idx));
163         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
164                                        old.v, new.v)) != old.v);
165
166         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
167
168         sectors = vstruct_blocks_plus(buf->data, c->block_bits,
169                                       buf->u64s_reserved) << c->block_bits;
170         BUG_ON(sectors > buf->sectors);
171         buf->sectors = sectors;
172
173         /*
174          * We have to set last_seq here, _before_ opening a new journal entry:
175          *
176          * A threads may replace an old pin with a new pin on their current
177          * journal reservation - the expectation being that the journal will
178          * contain either what the old pin protected or what the new pin
179          * protects.
180          *
181          * After the old pin is dropped journal_last_seq() won't include the old
182          * pin, so we can only write the updated last_seq on the entry that
183          * contains whatever the new pin protects.
184          *
185          * Restated, we can _not_ update last_seq for a given entry if there
186          * could be a newer entry open with reservations/pins that have been
187          * taken against it.
188          *
189          * Hence, we want update/set last_seq on the current journal entry right
190          * before we open a new one:
191          */
192         buf->data->last_seq     = cpu_to_le64(journal_last_seq(j));
193
194         journal_pin_new_entry(j, 1);
195
196         bch2_journal_buf_init(j);
197
198         cancel_delayed_work(&j->write_work);
199
200         bch2_journal_space_available(j);
201
202         bch2_journal_buf_put(j, old.idx, set_need_write);
203         return true;
204 }
205
206 static bool journal_entry_close(struct journal *j)
207 {
208         bool ret;
209
210         spin_lock(&j->lock);
211         ret = __journal_entry_close(j);
212         spin_unlock(&j->lock);
213
214         return ret;
215 }
216
217 /*
218  * should _only_ called from journal_res_get() - when we actually want a
219  * journal reservation - journal entry is open means journal is dirty:
220  *
221  * returns:
222  * 0:           success
223  * -ENOSPC:     journal currently full, must invoke reclaim
224  * -EAGAIN:     journal blocked, must wait
225  * -EROFS:      insufficient rw devices or journal error
226  */
227 static int journal_entry_open(struct journal *j)
228 {
229         struct bch_fs *c = container_of(j, struct bch_fs, journal);
230         struct journal_buf *buf = journal_cur_buf(j);
231         union journal_res_state old, new;
232         int u64s;
233         u64 v;
234
235         BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb));
236
237         lockdep_assert_held(&j->lock);
238         BUG_ON(journal_entry_is_open(j));
239
240         if (j->blocked)
241                 return cur_entry_blocked;
242
243         if (j->cur_entry_error)
244                 return j->cur_entry_error;
245
246         BUG_ON(!j->cur_entry_sectors);
247
248         buf->u64s_reserved      = j->entry_u64s_reserved;
249         buf->disk_sectors       = j->cur_entry_sectors;
250         buf->sectors            = min(buf->disk_sectors, buf->buf_size >> 9);
251
252         u64s = (int) (buf->sectors << 9) / sizeof(u64) -
253                 journal_entry_overhead(j);
254         u64s  = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1);
255
256         if (u64s <= le32_to_cpu(buf->data->u64s))
257                 return cur_entry_journal_full;
258
259         /*
260          * Must be set before marking the journal entry as open:
261          */
262         j->cur_entry_u64s = u64s;
263
264         v = atomic64_read(&j->reservations.counter);
265         do {
266                 old.v = new.v = v;
267
268                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
269                         return cur_entry_insufficient_devices;
270
271                 /* Handle any already added entries */
272                 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
273
274                 EBUG_ON(journal_state_count(new, new.idx));
275                 journal_state_inc(&new);
276         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
277                                        old.v, new.v)) != old.v);
278
279         if (j->res_get_blocked_start)
280                 bch2_time_stats_update(j->blocked_time,
281                                        j->res_get_blocked_start);
282         j->res_get_blocked_start = 0;
283
284         mod_delayed_work(system_freezable_wq,
285                          &j->write_work,
286                          msecs_to_jiffies(j->write_delay_ms));
287         journal_wake(j);
288         return 0;
289 }
290
291 static bool journal_quiesced(struct journal *j)
292 {
293         union journal_res_state state = READ_ONCE(j->reservations);
294         bool ret = !state.prev_buf_unwritten && !__journal_entry_is_open(state);
295
296         if (!ret)
297                 journal_entry_close(j);
298         return ret;
299 }
300
301 static void journal_quiesce(struct journal *j)
302 {
303         wait_event(j->wait, journal_quiesced(j));
304 }
305
306 static void journal_write_work(struct work_struct *work)
307 {
308         struct journal *j = container_of(work, struct journal, write_work.work);
309
310         journal_entry_close(j);
311 }
312
313 /*
314  * Given an inode number, if that inode number has data in the journal that
315  * hasn't yet been flushed, return the journal sequence number that needs to be
316  * flushed:
317  */
318 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
319 {
320         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
321         u64 seq = 0;
322
323         if (!test_bit(h, j->buf[0].has_inode) &&
324             !test_bit(h, j->buf[1].has_inode))
325                 return 0;
326
327         spin_lock(&j->lock);
328         if (test_bit(h, journal_cur_buf(j)->has_inode))
329                 seq = journal_cur_seq(j);
330         else if (test_bit(h, journal_prev_buf(j)->has_inode))
331                 seq = journal_cur_seq(j) - 1;
332         spin_unlock(&j->lock);
333
334         return seq;
335 }
336
337 void bch2_journal_set_has_inum(struct journal *j, u64 inode, u64 seq)
338 {
339         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
340         struct journal_buf *buf;
341
342         spin_lock(&j->lock);
343
344         if ((buf = journal_seq_to_buf(j, seq)))
345                 set_bit(h, buf->has_inode);
346
347         spin_unlock(&j->lock);
348 }
349
350 static int __journal_res_get(struct journal *j, struct journal_res *res,
351                              unsigned flags)
352 {
353         struct bch_fs *c = container_of(j, struct bch_fs, journal);
354         struct journal_buf *buf;
355         bool can_discard;
356         int ret;
357 retry:
358         if (journal_res_get_fast(j, res, flags))
359                 return 0;
360
361         if (bch2_journal_error(j))
362                 return -EROFS;
363
364         spin_lock(&j->lock);
365
366         /*
367          * Recheck after taking the lock, so we don't race with another thread
368          * that just did journal_entry_open() and call journal_entry_close()
369          * unnecessarily
370          */
371         if (journal_res_get_fast(j, res, flags)) {
372                 spin_unlock(&j->lock);
373                 return 0;
374         }
375
376         if (!(flags & JOURNAL_RES_GET_RESERVED) &&
377             !test_bit(JOURNAL_MAY_GET_UNRESERVED, &j->flags)) {
378                 /*
379                  * Don't want to close current journal entry, just need to
380                  * invoke reclaim:
381                  */
382                 ret = cur_entry_journal_full;
383                 goto unlock;
384         }
385
386         /*
387          * If we couldn't get a reservation because the current buf filled up,
388          * and we had room for a bigger entry on disk, signal that we want to
389          * realloc the journal bufs:
390          */
391         buf = journal_cur_buf(j);
392         if (journal_entry_is_open(j) &&
393             buf->buf_size >> 9 < buf->disk_sectors &&
394             buf->buf_size < JOURNAL_ENTRY_SIZE_MAX)
395                 j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1);
396
397         if (journal_entry_is_open(j) &&
398             !__journal_entry_close(j)) {
399                 /*
400                  * We failed to get a reservation on the current open journal
401                  * entry because it's full, and we can't close it because
402                  * there's still a previous one in flight:
403                  */
404                 trace_journal_entry_full(c);
405                 ret = cur_entry_blocked;
406         } else {
407                 ret = journal_entry_open(j);
408         }
409 unlock:
410         if ((ret && ret != cur_entry_insufficient_devices) &&
411             !j->res_get_blocked_start) {
412                 j->res_get_blocked_start = local_clock() ?: 1;
413                 trace_journal_full(c);
414         }
415
416         can_discard = j->can_discard;
417         spin_unlock(&j->lock);
418
419         if (!ret)
420                 goto retry;
421
422         if (WARN_ONCE(ret == cur_entry_journal_full &&
423                       !can_discard &&
424                       (flags & JOURNAL_RES_GET_RESERVED),
425                       "JOURNAL_RES_GET_RESERVED set but journal full")) {
426                 char *buf;
427
428                 buf = kmalloc(4096, GFP_NOFS);
429                 if (buf) {
430                         bch2_journal_debug_to_text(&_PBUF(buf, 4096), j);
431                         pr_err("\n%s", buf);
432                         kfree(buf);
433                 }
434         }
435
436         /*
437          * Journal is full - can't rely on reclaim from work item due to
438          * freezing:
439          */
440         if ((ret == cur_entry_journal_full ||
441              ret == cur_entry_journal_pin_full) &&
442             !(flags & JOURNAL_RES_GET_NONBLOCK)) {
443                 if (can_discard) {
444                         bch2_journal_do_discards(j);
445                         goto retry;
446                 }
447
448                 if (mutex_trylock(&j->reclaim_lock)) {
449                         bch2_journal_reclaim(j);
450                         mutex_unlock(&j->reclaim_lock);
451                 }
452         }
453
454         return ret == cur_entry_insufficient_devices ? -EROFS : -EAGAIN;
455 }
456
457 /*
458  * Essentially the entry function to the journaling code. When bcachefs is doing
459  * a btree insert, it calls this function to get the current journal write.
460  * Journal write is the structure used set up journal writes. The calling
461  * function will then add its keys to the structure, queuing them for the next
462  * write.
463  *
464  * To ensure forward progress, the current task must not be holding any
465  * btree node write locks.
466  */
467 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
468                                   unsigned flags)
469 {
470         int ret;
471
472         closure_wait_event(&j->async_wait,
473                    (ret = __journal_res_get(j, res, flags)) != -EAGAIN ||
474                    (flags & JOURNAL_RES_GET_NONBLOCK));
475         return ret;
476 }
477
478 /* journal_preres: */
479
480 static bool journal_preres_available(struct journal *j,
481                                      struct journal_preres *res,
482                                      unsigned new_u64s,
483                                      unsigned flags)
484 {
485         bool ret = bch2_journal_preres_get_fast(j, res, new_u64s, flags);
486
487         if (!ret && mutex_trylock(&j->reclaim_lock)) {
488                 bch2_journal_reclaim(j);
489                 mutex_unlock(&j->reclaim_lock);
490         }
491
492         return ret;
493 }
494
495 int __bch2_journal_preres_get(struct journal *j,
496                               struct journal_preres *res,
497                               unsigned new_u64s,
498                               unsigned flags)
499 {
500         int ret;
501
502         closure_wait_event(&j->preres_wait,
503                    (ret = bch2_journal_error(j)) ||
504                    journal_preres_available(j, res, new_u64s, flags));
505         return ret;
506 }
507
508 /* journal_entry_res: */
509
510 void bch2_journal_entry_res_resize(struct journal *j,
511                                    struct journal_entry_res *res,
512                                    unsigned new_u64s)
513 {
514         union journal_res_state state;
515         int d = new_u64s - res->u64s;
516
517         spin_lock(&j->lock);
518
519         j->entry_u64s_reserved += d;
520         if (d <= 0)
521                 goto out;
522
523         j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d);
524         smp_mb();
525         state = READ_ONCE(j->reservations);
526
527         if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL &&
528             state.cur_entry_offset > j->cur_entry_u64s) {
529                 j->cur_entry_u64s += d;
530                 /*
531                  * Not enough room in current journal entry, have to flush it:
532                  */
533                 __journal_entry_close(j);
534         } else {
535                 journal_cur_buf(j)->u64s_reserved += d;
536         }
537 out:
538         spin_unlock(&j->lock);
539         res->u64s += d;
540 }
541
542 /* journal flushing: */
543
544 /**
545  * bch2_journal_flush_seq_async - wait for a journal entry to be written
546  *
547  * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
548  * necessary
549  */
550 int bch2_journal_flush_seq_async(struct journal *j, u64 seq,
551                                  struct closure *parent)
552 {
553         struct journal_buf *buf;
554         int ret = 0;
555
556         if (seq <= j->seq_ondisk)
557                 return 1;
558
559         spin_lock(&j->lock);
560
561         /* Recheck under lock: */
562         if (j->err_seq && seq >= j->err_seq) {
563                 ret = -EIO;
564                 goto out;
565         }
566
567         if (seq <= j->seq_ondisk) {
568                 ret = 1;
569                 goto out;
570         }
571
572         if (parent &&
573             (buf = journal_seq_to_buf(j, seq)))
574                 if (!closure_wait(&buf->wait, parent))
575                         BUG();
576
577         if (seq == journal_cur_seq(j))
578                 __journal_entry_close(j);
579 out:
580         spin_unlock(&j->lock);
581         return ret;
582 }
583
584 int bch2_journal_flush_seq(struct journal *j, u64 seq)
585 {
586         u64 start_time = local_clock();
587         int ret, ret2;
588
589         ret = wait_event_killable(j->wait, (ret2 = bch2_journal_flush_seq_async(j, seq, NULL)));
590
591         bch2_time_stats_update(j->flush_seq_time, start_time);
592
593         return ret ?: ret2 < 0 ? ret2 : 0;
594 }
595
596 int bch2_journal_meta(struct journal *j)
597 {
598         struct journal_res res;
599         int ret;
600
601         memset(&res, 0, sizeof(res));
602
603         ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
604         if (ret)
605                 return ret;
606
607         bch2_journal_res_put(j, &res);
608
609         return bch2_journal_flush_seq(j, res.seq);
610 }
611
612 /*
613  * bch2_journal_flush_async - if there is an open journal entry, or a journal
614  * still being written, write it and wait for the write to complete
615  */
616 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
617 {
618         u64 seq, journal_seq;
619
620         spin_lock(&j->lock);
621         journal_seq = journal_cur_seq(j);
622
623         if (journal_entry_is_open(j)) {
624                 seq = journal_seq;
625         } else if (journal_seq) {
626                 seq = journal_seq - 1;
627         } else {
628                 spin_unlock(&j->lock);
629                 return;
630         }
631         spin_unlock(&j->lock);
632
633         bch2_journal_flush_seq_async(j, seq, parent);
634 }
635
636 int bch2_journal_flush(struct journal *j)
637 {
638         u64 seq, journal_seq;
639
640         spin_lock(&j->lock);
641         journal_seq = journal_cur_seq(j);
642
643         if (journal_entry_is_open(j)) {
644                 seq = journal_seq;
645         } else if (journal_seq) {
646                 seq = journal_seq - 1;
647         } else {
648                 spin_unlock(&j->lock);
649                 return 0;
650         }
651         spin_unlock(&j->lock);
652
653         return bch2_journal_flush_seq(j, seq);
654 }
655
656 /* block/unlock the journal: */
657
658 void bch2_journal_unblock(struct journal *j)
659 {
660         spin_lock(&j->lock);
661         j->blocked--;
662         spin_unlock(&j->lock);
663
664         journal_wake(j);
665 }
666
667 void bch2_journal_block(struct journal *j)
668 {
669         spin_lock(&j->lock);
670         j->blocked++;
671         spin_unlock(&j->lock);
672
673         journal_quiesce(j);
674 }
675
676 /* allocate journal on a device: */
677
678 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
679                                          bool new_fs, struct closure *cl)
680 {
681         struct bch_fs *c = ca->fs;
682         struct journal_device *ja = &ca->journal;
683         struct bch_sb_field_journal *journal_buckets;
684         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
685         int ret = 0;
686
687         /* don't handle reducing nr of buckets yet: */
688         if (nr <= ja->nr)
689                 return 0;
690
691         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
692         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
693         if (!new_buckets || !new_bucket_seq) {
694                 ret = -ENOMEM;
695                 goto err;
696         }
697
698         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
699                                         nr + sizeof(*journal_buckets) / sizeof(u64));
700         if (!journal_buckets) {
701                 ret = -ENOSPC;
702                 goto err;
703         }
704
705         /*
706          * We may be called from the device add path, before the new device has
707          * actually been added to the running filesystem:
708          */
709         if (c)
710                 spin_lock(&c->journal.lock);
711
712         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
713         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
714         swap(new_buckets,       ja->buckets);
715         swap(new_bucket_seq,    ja->bucket_seq);
716
717         if (c)
718                 spin_unlock(&c->journal.lock);
719
720         while (ja->nr < nr) {
721                 struct open_bucket *ob = NULL;
722                 unsigned pos;
723                 long bucket;
724
725                 if (new_fs) {
726                         bucket = bch2_bucket_alloc_new_fs(ca);
727                         if (bucket < 0) {
728                                 ret = -ENOSPC;
729                                 goto err;
730                         }
731                 } else {
732                         rcu_read_lock();
733                         ob = bch2_bucket_alloc(c, ca, RESERVE_ALLOC,
734                                                false, cl);
735                         rcu_read_unlock();
736                         if (IS_ERR(ob)) {
737                                 ret = cl ? -EAGAIN : -ENOSPC;
738                                 goto err;
739                         }
740
741                         bucket = sector_to_bucket(ca, ob->ptr.offset);
742                 }
743
744                 if (c) {
745                         percpu_down_read(&c->mark_lock);
746                         spin_lock(&c->journal.lock);
747                 }
748
749                 /*
750                  * XXX
751                  * For resize at runtime, we should be writing the new
752                  * superblock before inserting into the journal array
753                  */
754
755                 pos = ja->nr ? (ja->cur_idx + 1) % ja->nr : 0;
756                 __array_insert_item(ja->buckets,                ja->nr, pos);
757                 __array_insert_item(ja->bucket_seq,             ja->nr, pos);
758                 __array_insert_item(journal_buckets->buckets,   ja->nr, pos);
759                 ja->nr++;
760
761                 ja->buckets[pos] = bucket;
762                 ja->bucket_seq[pos] = 0;
763                 journal_buckets->buckets[pos] = cpu_to_le64(bucket);
764
765                 if (pos <= ja->discard_idx)
766                         ja->discard_idx = (ja->discard_idx + 1) % ja->nr;
767                 if (pos <= ja->dirty_idx_ondisk)
768                         ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr;
769                 if (pos <= ja->dirty_idx)
770                         ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr;
771                 if (pos <= ja->cur_idx)
772                         ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
773
774                 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_journal,
775                                           ca->mi.bucket_size,
776                                           gc_phase(GC_PHASE_SB),
777                                           0);
778
779                 if (c) {
780                         spin_unlock(&c->journal.lock);
781                         percpu_up_read(&c->mark_lock);
782                 }
783
784                 if (!new_fs)
785                         bch2_open_bucket_put(c, ob);
786         }
787 err:
788         bch2_sb_resize_journal(&ca->disk_sb,
789                 ja->nr + sizeof(*journal_buckets) / sizeof(u64));
790         kfree(new_bucket_seq);
791         kfree(new_buckets);
792
793         return ret;
794 }
795
796 /*
797  * Allocate more journal space at runtime - not currently making use if it, but
798  * the code works:
799  */
800 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
801                                 unsigned nr)
802 {
803         struct journal_device *ja = &ca->journal;
804         struct closure cl;
805         unsigned current_nr;
806         int ret;
807
808         closure_init_stack(&cl);
809
810         do {
811                 struct disk_reservation disk_res = { 0, 0 };
812
813                 closure_sync(&cl);
814
815                 mutex_lock(&c->sb_lock);
816                 current_nr = ja->nr;
817
818                 /*
819                  * note: journal buckets aren't really counted as _sectors_ used yet, so
820                  * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
821                  * when space used goes up without a reservation - but we do need the
822                  * reservation to ensure we'll actually be able to allocate:
823                  */
824
825                 if (bch2_disk_reservation_get(c, &disk_res,
826                                               bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
827                         mutex_unlock(&c->sb_lock);
828                         return -ENOSPC;
829                 }
830
831                 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
832
833                 bch2_disk_reservation_put(c, &disk_res);
834
835                 if (ja->nr != current_nr)
836                         bch2_write_super(c);
837                 mutex_unlock(&c->sb_lock);
838         } while (ret == -EAGAIN);
839
840         return ret;
841 }
842
843 int bch2_dev_journal_alloc(struct bch_dev *ca)
844 {
845         unsigned nr;
846
847         if (dynamic_fault("bcachefs:add:journal_alloc"))
848                 return -ENOMEM;
849
850         /*
851          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
852          * is smaller:
853          */
854         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
855                      BCH_JOURNAL_BUCKETS_MIN,
856                      min(1 << 10,
857                          (1 << 20) / ca->mi.bucket_size));
858
859         return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
860 }
861
862 /* startup/shutdown: */
863
864 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
865 {
866         union journal_res_state state;
867         struct journal_buf *w;
868         bool ret;
869
870         spin_lock(&j->lock);
871         state = READ_ONCE(j->reservations);
872         w = j->buf + !state.idx;
873
874         ret = state.prev_buf_unwritten &&
875                 bch2_bkey_has_device(bkey_i_to_s_c(&w->key), dev_idx);
876         spin_unlock(&j->lock);
877
878         return ret;
879 }
880
881 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
882 {
883         wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
884 }
885
886 void bch2_fs_journal_stop(struct journal *j)
887 {
888         bch2_journal_flush_all_pins(j);
889
890         wait_event(j->wait, journal_entry_close(j));
891
892         /*
893          * Always write a new journal entry, to make sure the clock hands are up
894          * to date (and match the superblock)
895          */
896         bch2_journal_meta(j);
897
898         journal_quiesce(j);
899
900         BUG_ON(!bch2_journal_error(j) &&
901                (journal_entry_is_open(j) ||
902                 j->last_empty_seq + 1 != journal_cur_seq(j)));
903
904         cancel_delayed_work_sync(&j->write_work);
905         bch2_journal_reclaim_stop(j);
906 }
907
908 int bch2_fs_journal_start(struct journal *j, u64 cur_seq,
909                           struct list_head *journal_entries)
910 {
911         struct bch_fs *c = container_of(j, struct bch_fs, journal);
912         struct journal_entry_pin_list *p;
913         struct journal_replay *i;
914         u64 last_seq = cur_seq, nr, seq;
915
916         if (!list_empty(journal_entries))
917                 last_seq = le64_to_cpu(list_last_entry(journal_entries,
918                                 struct journal_replay, list)->j.last_seq);
919
920         nr = cur_seq - last_seq;
921
922         if (nr + 1 > j->pin.size) {
923                 free_fifo(&j->pin);
924                 init_fifo(&j->pin, roundup_pow_of_two(nr + 1), GFP_KERNEL);
925                 if (!j->pin.data) {
926                         bch_err(c, "error reallocating journal fifo (%llu open entries)", nr);
927                         return -ENOMEM;
928                 }
929         }
930
931         j->replay_journal_seq   = last_seq;
932         j->replay_journal_seq_end = cur_seq;
933         j->last_seq_ondisk      = last_seq;
934         j->pin.front            = last_seq;
935         j->pin.back             = cur_seq;
936         atomic64_set(&j->seq, cur_seq - 1);
937
938         fifo_for_each_entry_ptr(p, &j->pin, seq) {
939                 INIT_LIST_HEAD(&p->list);
940                 INIT_LIST_HEAD(&p->flushed);
941                 atomic_set(&p->count, 1);
942                 p->devs.nr = 0;
943         }
944
945         list_for_each_entry(i, journal_entries, list) {
946                 seq = le64_to_cpu(i->j.seq);
947                 BUG_ON(seq >= cur_seq);
948
949                 if (seq < last_seq)
950                         continue;
951
952                 journal_seq_pin(j, seq)->devs = i->devs;
953         }
954
955         spin_lock(&j->lock);
956
957         set_bit(JOURNAL_STARTED, &j->flags);
958
959         journal_pin_new_entry(j, 1);
960
961         j->reservations.idx = journal_cur_seq(j);
962
963         bch2_journal_buf_init(j);
964
965         c->last_bucket_seq_cleanup = journal_cur_seq(j);
966
967         bch2_journal_space_available(j);
968         spin_unlock(&j->lock);
969
970         return 0;
971 }
972
973 /* init/exit: */
974
975 void bch2_dev_journal_exit(struct bch_dev *ca)
976 {
977         kfree(ca->journal.bio);
978         kfree(ca->journal.buckets);
979         kfree(ca->journal.bucket_seq);
980
981         ca->journal.bio         = NULL;
982         ca->journal.buckets     = NULL;
983         ca->journal.bucket_seq  = NULL;
984 }
985
986 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
987 {
988         struct journal_device *ja = &ca->journal;
989         struct bch_sb_field_journal *journal_buckets =
990                 bch2_sb_get_journal(sb);
991         unsigned i;
992
993         ja->nr = bch2_nr_journal_buckets(journal_buckets);
994
995         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
996         if (!ja->bucket_seq)
997                 return -ENOMEM;
998
999         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
1000                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
1001         if (!ca->journal.bio)
1002                 return -ENOMEM;
1003
1004         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1005         if (!ja->buckets)
1006                 return -ENOMEM;
1007
1008         for (i = 0; i < ja->nr; i++)
1009                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
1010
1011         return 0;
1012 }
1013
1014 void bch2_fs_journal_exit(struct journal *j)
1015 {
1016         kvpfree(j->buf[1].data, j->buf[1].buf_size);
1017         kvpfree(j->buf[0].data, j->buf[0].buf_size);
1018         free_fifo(&j->pin);
1019 }
1020
1021 int bch2_fs_journal_init(struct journal *j)
1022 {
1023         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1024         static struct lock_class_key res_key;
1025         int ret = 0;
1026
1027         pr_verbose_init(c->opts, "");
1028
1029         spin_lock_init(&j->lock);
1030         spin_lock_init(&j->err_lock);
1031         init_waitqueue_head(&j->wait);
1032         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
1033         init_waitqueue_head(&j->pin_flush_wait);
1034         mutex_init(&j->reclaim_lock);
1035         mutex_init(&j->discard_lock);
1036
1037         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
1038
1039         j->buf[0].buf_size      = JOURNAL_ENTRY_SIZE_MIN;
1040         j->buf[1].buf_size      = JOURNAL_ENTRY_SIZE_MIN;
1041         j->write_delay_ms       = 1000;
1042         j->reclaim_delay_ms     = 100;
1043
1044         /* Btree roots: */
1045         j->entry_u64s_reserved +=
1046                 BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1047
1048         atomic64_set(&j->reservations.counter,
1049                 ((union journal_res_state)
1050                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
1051
1052         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
1053             !(j->buf[0].data = kvpmalloc(j->buf[0].buf_size, GFP_KERNEL)) ||
1054             !(j->buf[1].data = kvpmalloc(j->buf[1].buf_size, GFP_KERNEL))) {
1055                 ret = -ENOMEM;
1056                 goto out;
1057         }
1058
1059         j->pin.front = j->pin.back = 1;
1060 out:
1061         pr_verbose_init(c->opts, "ret %i", ret);
1062         return ret;
1063 }
1064
1065 /* debug: */
1066
1067 void bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
1068 {
1069         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1070         union journal_res_state s;
1071         struct bch_dev *ca;
1072         unsigned iter;
1073
1074         rcu_read_lock();
1075         spin_lock(&j->lock);
1076         s = READ_ONCE(j->reservations);
1077
1078         pr_buf(out,
1079                "active journal entries:\t%llu\n"
1080                "seq:\t\t\t%llu\n"
1081                "last_seq:\t\t%llu\n"
1082                "last_seq_ondisk:\t%llu\n"
1083                "prereserved:\t\t%u/%u\n"
1084                "nr direct reclaim:\t%llu\n"
1085                "nr background reclaim:\t%llu\n"
1086                "current entry sectors:\t%u\n"
1087                "current entry error:\t%u\n"
1088                "current entry:\t\t",
1089                fifo_used(&j->pin),
1090                journal_cur_seq(j),
1091                journal_last_seq(j),
1092                j->last_seq_ondisk,
1093                j->prereserved.reserved,
1094                j->prereserved.remaining,
1095                j->nr_direct_reclaim,
1096                j->nr_background_reclaim,
1097                j->cur_entry_sectors,
1098                j->cur_entry_error);
1099
1100         switch (s.cur_entry_offset) {
1101         case JOURNAL_ENTRY_ERROR_VAL:
1102                 pr_buf(out, "error\n");
1103                 break;
1104         case JOURNAL_ENTRY_CLOSED_VAL:
1105                 pr_buf(out, "closed\n");
1106                 break;
1107         default:
1108                 pr_buf(out, "%u/%u\n",
1109                        s.cur_entry_offset,
1110                        j->cur_entry_u64s);
1111                 break;
1112         }
1113
1114         pr_buf(out,
1115                "current entry refs:\t%u\n"
1116                "prev entry unwritten:\t",
1117                journal_state_count(s, s.idx));
1118
1119         if (s.prev_buf_unwritten)
1120                 pr_buf(out, "yes, ref %u sectors %u\n",
1121                        journal_state_count(s, !s.idx),
1122                        journal_prev_buf(j)->sectors);
1123         else
1124                 pr_buf(out, "no\n");
1125
1126         pr_buf(out,
1127                "need write:\t\t%i\n"
1128                "replay done:\t\t%i\n",
1129                test_bit(JOURNAL_NEED_WRITE,     &j->flags),
1130                test_bit(JOURNAL_REPLAY_DONE,    &j->flags));
1131
1132         for_each_member_device_rcu(ca, c, iter,
1133                                    &c->rw_devs[BCH_DATA_journal]) {
1134                 struct journal_device *ja = &ca->journal;
1135
1136                 if (!ja->nr)
1137                         continue;
1138
1139                 pr_buf(out,
1140                        "dev %u:\n"
1141                        "\tnr\t\t%u\n"
1142                        "\tavailable\t%u:%u\n"
1143                        "\tdiscard_idx\t\t%u\n"
1144                        "\tdirty_idx_ondisk\t%u (seq %llu)\n"
1145                        "\tdirty_idx\t\t%u (seq %llu)\n"
1146                        "\tcur_idx\t\t%u (seq %llu)\n",
1147                        iter, ja->nr,
1148                        bch2_journal_dev_buckets_available(j, ja, journal_space_discarded),
1149                        ja->sectors_free,
1150                        ja->discard_idx,
1151                        ja->dirty_idx_ondisk,    ja->bucket_seq[ja->dirty_idx_ondisk],
1152                        ja->dirty_idx,           ja->bucket_seq[ja->dirty_idx],
1153                        ja->cur_idx,             ja->bucket_seq[ja->cur_idx]);
1154         }
1155
1156         spin_unlock(&j->lock);
1157         rcu_read_unlock();
1158 }
1159
1160 void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j)
1161 {
1162         struct journal_entry_pin_list *pin_list;
1163         struct journal_entry_pin *pin;
1164         u64 i;
1165
1166         spin_lock(&j->lock);
1167         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
1168                 pr_buf(out, "%llu: count %u\n",
1169                        i, atomic_read(&pin_list->count));
1170
1171                 list_for_each_entry(pin, &pin_list->list, list)
1172                         pr_buf(out, "\t%px %ps\n",
1173                                pin, pin->flush);
1174
1175                 if (!list_empty(&pin_list->flushed))
1176                         pr_buf(out, "flushed:\n");
1177
1178                 list_for_each_entry(pin, &pin_list->flushed, list)
1179                         pr_buf(out, "\t%px %ps\n",
1180                                pin, pin->flush);
1181         }
1182         spin_unlock(&j->lock);
1183 }