]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
af5386d959c879cdc49e86f9d12e41c56a661588
[bcachefs-tools-debian] / libbcachefs / journal.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * bcachefs journalling code, for btree insertions
4  *
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcachefs.h"
9 #include "alloc_foreground.h"
10 #include "bkey_methods.h"
11 #include "btree_gc.h"
12 #include "btree_update.h"
13 #include "buckets.h"
14 #include "error.h"
15 #include "journal.h"
16 #include "journal_io.h"
17 #include "journal_reclaim.h"
18 #include "journal_seq_blacklist.h"
19 #include "super-io.h"
20
21 #include <trace/events/bcachefs.h>
22
23 static u64 last_unwritten_seq(struct journal *j)
24 {
25         union journal_res_state s = READ_ONCE(j->reservations);
26
27         lockdep_assert_held(&j->lock);
28
29         return journal_cur_seq(j) - ((s.idx - s.unwritten_idx) & JOURNAL_BUF_MASK);
30 }
31
32 static inline bool journal_seq_unwritten(struct journal *j, u64 seq)
33 {
34         return seq >= last_unwritten_seq(j);
35 }
36
37 static bool __journal_entry_is_open(union journal_res_state state)
38 {
39         return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
40 }
41
42 static bool journal_entry_is_open(struct journal *j)
43 {
44         return __journal_entry_is_open(j->reservations);
45 }
46
47 static inline struct journal_buf *
48 journal_seq_to_buf(struct journal *j, u64 seq)
49 {
50         struct journal_buf *buf = NULL;
51
52         EBUG_ON(seq > journal_cur_seq(j));
53         EBUG_ON(seq == journal_cur_seq(j) &&
54                 j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
55
56         if (journal_seq_unwritten(j, seq)) {
57                 buf = j->buf + (seq & JOURNAL_BUF_MASK);
58                 EBUG_ON(le64_to_cpu(buf->data->seq) != seq);
59         }
60         return buf;
61 }
62
63 static void journal_pin_list_init(struct journal_entry_pin_list *p, int count)
64 {
65         INIT_LIST_HEAD(&p->list);
66         INIT_LIST_HEAD(&p->key_cache_list);
67         INIT_LIST_HEAD(&p->flushed);
68         atomic_set(&p->count, count);
69         p->devs.nr = 0;
70 }
71
72 static void journal_pin_new_entry(struct journal *j)
73 {
74         /*
75          * The fifo_push() needs to happen at the same time as j->seq is
76          * incremented for journal_last_seq() to be calculated correctly
77          */
78         atomic64_inc(&j->seq);
79         journal_pin_list_init(fifo_push_ref(&j->pin), 1);
80 }
81
82 static void bch2_journal_buf_init(struct journal *j)
83 {
84         struct journal_buf *buf = journal_cur_buf(j);
85
86         bkey_extent_init(&buf->key);
87         buf->noflush    = false;
88         buf->must_flush = false;
89         buf->separate_flush = false;
90
91         memset(buf->has_inode, 0, sizeof(buf->has_inode));
92
93         memset(buf->data, 0, sizeof(*buf->data));
94         buf->data->seq  = cpu_to_le64(journal_cur_seq(j));
95         buf->data->u64s = 0;
96 }
97
98 void bch2_journal_halt(struct journal *j)
99 {
100         union journal_res_state old, new;
101         u64 v = atomic64_read(&j->reservations.counter);
102
103         do {
104                 old.v = new.v = v;
105                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
106                         return;
107
108                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
109         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
110                                        old.v, new.v)) != old.v);
111
112         j->err_seq = journal_cur_seq(j);
113         journal_wake(j);
114         closure_wake_up(&journal_cur_buf(j)->wait);
115 }
116
117 /* journal entry close/open: */
118
119 void __bch2_journal_buf_put(struct journal *j)
120 {
121         struct bch_fs *c = container_of(j, struct bch_fs, journal);
122
123         closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL);
124 }
125
126 /*
127  * Returns true if journal entry is now closed:
128  *
129  * We don't close a journal_buf until the next journal_buf is finished writing,
130  * and can be opened again - this also initializes the next journal_buf:
131  */
132 static bool __journal_entry_close(struct journal *j)
133 {
134         struct bch_fs *c = container_of(j, struct bch_fs, journal);
135         struct journal_buf *buf = journal_cur_buf(j);
136         union journal_res_state old, new;
137         u64 v = atomic64_read(&j->reservations.counter);
138         unsigned sectors;
139
140         lockdep_assert_held(&j->lock);
141
142         do {
143                 old.v = new.v = v;
144                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
145                         return true;
146
147                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
148                         /* this entry will never be written: */
149                         closure_wake_up(&buf->wait);
150                         return true;
151                 }
152
153                 if (!test_bit(JOURNAL_NEED_WRITE, &j->flags)) {
154                         set_bit(JOURNAL_NEED_WRITE, &j->flags);
155                         j->need_write_time = local_clock();
156                 }
157
158                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
159                 new.idx++;
160
161                 if (new.idx == new.unwritten_idx)
162                         return false;
163
164                 BUG_ON(journal_state_count(new, new.idx));
165         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
166                                        old.v, new.v)) != old.v);
167
168         /* Close out old buffer: */
169         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
170
171         sectors = vstruct_blocks_plus(buf->data, c->block_bits,
172                                       buf->u64s_reserved) << c->block_bits;
173         BUG_ON(sectors > buf->sectors);
174         buf->sectors = sectors;
175
176         /*
177          * We have to set last_seq here, _before_ opening a new journal entry:
178          *
179          * A threads may replace an old pin with a new pin on their current
180          * journal reservation - the expectation being that the journal will
181          * contain either what the old pin protected or what the new pin
182          * protects.
183          *
184          * After the old pin is dropped journal_last_seq() won't include the old
185          * pin, so we can only write the updated last_seq on the entry that
186          * contains whatever the new pin protects.
187          *
188          * Restated, we can _not_ update last_seq for a given entry if there
189          * could be a newer entry open with reservations/pins that have been
190          * taken against it.
191          *
192          * Hence, we want update/set last_seq on the current journal entry right
193          * before we open a new one:
194          */
195         buf->last_seq           = journal_last_seq(j);
196         buf->data->last_seq     = cpu_to_le64(buf->last_seq);
197
198         __bch2_journal_pin_put(j, le64_to_cpu(buf->data->seq));
199
200         /* Initialize new buffer: */
201         journal_pin_new_entry(j);
202
203         bch2_journal_buf_init(j);
204
205         cancel_delayed_work(&j->write_work);
206         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
207
208         bch2_journal_space_available(j);
209
210         bch2_journal_buf_put(j, old.idx);
211         return true;
212 }
213
214 static bool journal_entry_want_write(struct journal *j)
215 {
216         union journal_res_state s = READ_ONCE(j->reservations);
217         bool ret = false;
218
219         /*
220          * Don't close it yet if we already have a write in flight, but do set
221          * NEED_WRITE:
222          */
223         if (s.idx != s.unwritten_idx)
224                 set_bit(JOURNAL_NEED_WRITE, &j->flags);
225         else
226                 ret = __journal_entry_close(j);
227
228         return ret;
229 }
230
231 static bool journal_entry_close(struct journal *j)
232 {
233         bool ret;
234
235         spin_lock(&j->lock);
236         ret = journal_entry_want_write(j);
237         spin_unlock(&j->lock);
238
239         return ret;
240 }
241
242 /*
243  * should _only_ called from journal_res_get() - when we actually want a
244  * journal reservation - journal entry is open means journal is dirty:
245  *
246  * returns:
247  * 0:           success
248  * -ENOSPC:     journal currently full, must invoke reclaim
249  * -EAGAIN:     journal blocked, must wait
250  * -EROFS:      insufficient rw devices or journal error
251  */
252 static int journal_entry_open(struct journal *j)
253 {
254         struct bch_fs *c = container_of(j, struct bch_fs, journal);
255         struct journal_buf *buf = journal_cur_buf(j);
256         union journal_res_state old, new;
257         int u64s;
258         u64 v;
259
260         BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb));
261
262         lockdep_assert_held(&j->lock);
263         BUG_ON(journal_entry_is_open(j));
264
265         if (j->blocked)
266                 return cur_entry_blocked;
267
268         if (j->cur_entry_error)
269                 return j->cur_entry_error;
270
271         BUG_ON(!j->cur_entry_sectors);
272
273         buf->u64s_reserved      = j->entry_u64s_reserved;
274         buf->disk_sectors       = j->cur_entry_sectors;
275         buf->sectors            = min(buf->disk_sectors, buf->buf_size >> 9);
276
277         u64s = (int) (buf->sectors << 9) / sizeof(u64) -
278                 journal_entry_overhead(j);
279         u64s  = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1);
280
281         if (u64s <= le32_to_cpu(buf->data->u64s))
282                 return cur_entry_journal_full;
283
284         /*
285          * Must be set before marking the journal entry as open:
286          */
287         j->cur_entry_u64s = u64s;
288
289         v = atomic64_read(&j->reservations.counter);
290         do {
291                 old.v = new.v = v;
292
293                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
294                         return cur_entry_insufficient_devices;
295
296                 /* Handle any already added entries */
297                 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
298
299                 EBUG_ON(journal_state_count(new, new.idx));
300                 journal_state_inc(&new);
301         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
302                                        old.v, new.v)) != old.v);
303
304         if (j->res_get_blocked_start)
305                 bch2_time_stats_update(j->blocked_time,
306                                        j->res_get_blocked_start);
307         j->res_get_blocked_start = 0;
308
309         mod_delayed_work(c->io_complete_wq,
310                          &j->write_work,
311                          msecs_to_jiffies(j->write_delay_ms));
312         journal_wake(j);
313         return 0;
314 }
315
316 static bool journal_quiesced(struct journal *j)
317 {
318         union journal_res_state s = READ_ONCE(j->reservations);
319         bool ret = s.idx == s.unwritten_idx && !__journal_entry_is_open(s);
320
321         if (!ret)
322                 journal_entry_close(j);
323         return ret;
324 }
325
326 static void journal_quiesce(struct journal *j)
327 {
328         wait_event(j->wait, journal_quiesced(j));
329 }
330
331 static void journal_write_work(struct work_struct *work)
332 {
333         struct journal *j = container_of(work, struct journal, write_work.work);
334
335         journal_entry_close(j);
336 }
337
338 /*
339  * Given an inode number, if that inode number has data in the journal that
340  * hasn't yet been flushed, return the journal sequence number that needs to be
341  * flushed:
342  */
343 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
344 {
345         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
346         union journal_res_state s;
347         unsigned i;
348         u64 seq;
349
350
351         spin_lock(&j->lock);
352         seq = journal_cur_seq(j);
353         s = READ_ONCE(j->reservations);
354         i = s.idx;
355
356         while (1) {
357                 if (test_bit(h, j->buf[i].has_inode))
358                         goto out;
359
360                 if (i == s.unwritten_idx)
361                         break;
362
363                 i = (i - 1) & JOURNAL_BUF_MASK;
364                 seq--;
365         }
366
367         seq = 0;
368 out:
369         spin_unlock(&j->lock);
370
371         return seq;
372 }
373
374 void bch2_journal_set_has_inum(struct journal *j, u64 inode, u64 seq)
375 {
376         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
377         struct journal_buf *buf;
378
379         spin_lock(&j->lock);
380
381         if ((buf = journal_seq_to_buf(j, seq)))
382                 set_bit(h, buf->has_inode);
383
384         spin_unlock(&j->lock);
385 }
386
387 static int __journal_res_get(struct journal *j, struct journal_res *res,
388                              unsigned flags)
389 {
390         struct bch_fs *c = container_of(j, struct bch_fs, journal);
391         struct journal_buf *buf;
392         bool can_discard;
393         int ret;
394 retry:
395         if (journal_res_get_fast(j, res, flags))
396                 return 0;
397
398         if (bch2_journal_error(j))
399                 return -EROFS;
400
401         spin_lock(&j->lock);
402
403         /*
404          * Recheck after taking the lock, so we don't race with another thread
405          * that just did journal_entry_open() and call journal_entry_close()
406          * unnecessarily
407          */
408         if (journal_res_get_fast(j, res, flags)) {
409                 spin_unlock(&j->lock);
410                 return 0;
411         }
412
413         if (!(flags & JOURNAL_RES_GET_RESERVED) &&
414             !test_bit(JOURNAL_MAY_GET_UNRESERVED, &j->flags)) {
415                 /*
416                  * Don't want to close current journal entry, just need to
417                  * invoke reclaim:
418                  */
419                 ret = cur_entry_journal_full;
420                 goto unlock;
421         }
422
423         /*
424          * If we couldn't get a reservation because the current buf filled up,
425          * and we had room for a bigger entry on disk, signal that we want to
426          * realloc the journal bufs:
427          */
428         buf = journal_cur_buf(j);
429         if (journal_entry_is_open(j) &&
430             buf->buf_size >> 9 < buf->disk_sectors &&
431             buf->buf_size < JOURNAL_ENTRY_SIZE_MAX)
432                 j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1);
433
434         if (journal_entry_is_open(j) &&
435             !__journal_entry_close(j)) {
436                 /*
437                  * We failed to get a reservation on the current open journal
438                  * entry because it's full, and we can't close it because
439                  * there's still a previous one in flight:
440                  */
441                 trace_journal_entry_full(c);
442                 ret = cur_entry_blocked;
443         } else {
444                 ret = journal_entry_open(j);
445         }
446 unlock:
447         if ((ret && ret != cur_entry_insufficient_devices) &&
448             !j->res_get_blocked_start) {
449                 j->res_get_blocked_start = local_clock() ?: 1;
450                 trace_journal_full(c);
451         }
452
453         can_discard = j->can_discard;
454         spin_unlock(&j->lock);
455
456         if (!ret)
457                 goto retry;
458
459         if ((ret == cur_entry_journal_full ||
460              ret == cur_entry_journal_pin_full) &&
461             !can_discard &&
462             j->reservations.idx == j->reservations.unwritten_idx &&
463             (flags & JOURNAL_RES_GET_RESERVED)) {
464                 char *journal_debug_buf = kmalloc(4096, GFP_ATOMIC);
465
466                 bch_err(c, "Journal stuck!");
467                 if (journal_debug_buf) {
468                         bch2_journal_debug_to_text(&_PBUF(journal_debug_buf, 4096), j);
469                         bch_err(c, "%s", journal_debug_buf);
470
471                         bch2_journal_pins_to_text(&_PBUF(journal_debug_buf, 4096), j);
472                         bch_err(c, "Journal pins:\n%s", journal_debug_buf);
473                         kfree(journal_debug_buf);
474                 }
475
476                 bch2_fatal_error(c);
477                 dump_stack();
478         }
479
480         /*
481          * Journal is full - can't rely on reclaim from work item due to
482          * freezing:
483          */
484         if ((ret == cur_entry_journal_full ||
485              ret == cur_entry_journal_pin_full) &&
486             !(flags & JOURNAL_RES_GET_NONBLOCK)) {
487                 if (can_discard) {
488                         bch2_journal_do_discards(j);
489                         goto retry;
490                 }
491
492                 if (mutex_trylock(&j->reclaim_lock)) {
493                         bch2_journal_reclaim(j);
494                         mutex_unlock(&j->reclaim_lock);
495                 }
496         }
497
498         return ret == cur_entry_insufficient_devices ? -EROFS : -EAGAIN;
499 }
500
501 /*
502  * Essentially the entry function to the journaling code. When bcachefs is doing
503  * a btree insert, it calls this function to get the current journal write.
504  * Journal write is the structure used set up journal writes. The calling
505  * function will then add its keys to the structure, queuing them for the next
506  * write.
507  *
508  * To ensure forward progress, the current task must not be holding any
509  * btree node write locks.
510  */
511 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
512                                   unsigned flags)
513 {
514         int ret;
515
516         closure_wait_event(&j->async_wait,
517                    (ret = __journal_res_get(j, res, flags)) != -EAGAIN ||
518                    (flags & JOURNAL_RES_GET_NONBLOCK));
519         return ret;
520 }
521
522 /* journal_preres: */
523
524 static bool journal_preres_available(struct journal *j,
525                                      struct journal_preres *res,
526                                      unsigned new_u64s,
527                                      unsigned flags)
528 {
529         bool ret = bch2_journal_preres_get_fast(j, res, new_u64s, flags, true);
530
531         if (!ret && mutex_trylock(&j->reclaim_lock)) {
532                 bch2_journal_reclaim(j);
533                 mutex_unlock(&j->reclaim_lock);
534         }
535
536         return ret;
537 }
538
539 int __bch2_journal_preres_get(struct journal *j,
540                               struct journal_preres *res,
541                               unsigned new_u64s,
542                               unsigned flags)
543 {
544         int ret;
545
546         closure_wait_event(&j->preres_wait,
547                    (ret = bch2_journal_error(j)) ||
548                    journal_preres_available(j, res, new_u64s, flags));
549         return ret;
550 }
551
552 /* journal_entry_res: */
553
554 void bch2_journal_entry_res_resize(struct journal *j,
555                                    struct journal_entry_res *res,
556                                    unsigned new_u64s)
557 {
558         union journal_res_state state;
559         int d = new_u64s - res->u64s;
560
561         spin_lock(&j->lock);
562
563         j->entry_u64s_reserved += d;
564         if (d <= 0)
565                 goto out;
566
567         j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d);
568         smp_mb();
569         state = READ_ONCE(j->reservations);
570
571         if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL &&
572             state.cur_entry_offset > j->cur_entry_u64s) {
573                 j->cur_entry_u64s += d;
574                 /*
575                  * Not enough room in current journal entry, have to flush it:
576                  */
577                 __journal_entry_close(j);
578         } else {
579                 journal_cur_buf(j)->u64s_reserved += d;
580         }
581 out:
582         spin_unlock(&j->lock);
583         res->u64s += d;
584 }
585
586 /* journal flushing: */
587
588 /**
589  * bch2_journal_flush_seq_async - wait for a journal entry to be written
590  *
591  * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
592  * necessary
593  */
594 int bch2_journal_flush_seq_async(struct journal *j, u64 seq,
595                                  struct closure *parent)
596 {
597         struct journal_buf *buf;
598         int ret = 0;
599
600         if (seq <= j->flushed_seq_ondisk)
601                 return 1;
602
603         spin_lock(&j->lock);
604
605         BUG_ON(seq > journal_cur_seq(j));
606
607         /* Recheck under lock: */
608         if (j->err_seq && seq >= j->err_seq) {
609                 ret = -EIO;
610                 goto out;
611         }
612
613         if (seq <= j->flushed_seq_ondisk) {
614                 ret = 1;
615                 goto out;
616         }
617
618         /* if seq was written, but not flushed - flush a newer one instead */
619         seq = max(seq, last_unwritten_seq(j));
620
621 recheck_need_open:
622         if (seq == journal_cur_seq(j) && !journal_entry_is_open(j)) {
623                 struct journal_res res = { 0 };
624
625                 spin_unlock(&j->lock);
626
627                 ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
628                 if (ret)
629                         return ret;
630
631                 seq = res.seq;
632                 buf = j->buf + (seq & JOURNAL_BUF_MASK);
633                 buf->must_flush = true;
634                 set_bit(JOURNAL_NEED_WRITE, &j->flags);
635
636                 if (parent && !closure_wait(&buf->wait, parent))
637                         BUG();
638
639                 bch2_journal_res_put(j, &res);
640
641                 spin_lock(&j->lock);
642                 goto want_write;
643         }
644
645         /*
646          * if write was kicked off without a flush, flush the next sequence
647          * number instead
648          */
649         buf = journal_seq_to_buf(j, seq);
650         if (buf->noflush) {
651                 seq++;
652                 goto recheck_need_open;
653         }
654
655         buf->must_flush = true;
656
657         if (parent && !closure_wait(&buf->wait, parent))
658                 BUG();
659 want_write:
660         if (seq == journal_cur_seq(j))
661                 journal_entry_want_write(j);
662 out:
663         spin_unlock(&j->lock);
664         return ret;
665 }
666
667 int bch2_journal_flush_seq(struct journal *j, u64 seq)
668 {
669         u64 start_time = local_clock();
670         int ret, ret2;
671
672         ret = wait_event_interruptible(j->wait, (ret2 = bch2_journal_flush_seq_async(j, seq, NULL)));
673
674         if (!ret)
675                 bch2_time_stats_update(j->flush_seq_time, start_time);
676
677         return ret ?: ret2 < 0 ? ret2 : 0;
678 }
679
680 int bch2_journal_meta(struct journal *j)
681 {
682         struct journal_res res;
683         int ret;
684
685         memset(&res, 0, sizeof(res));
686
687         ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
688         if (ret)
689                 return ret;
690
691         bch2_journal_res_put(j, &res);
692
693         return bch2_journal_flush_seq(j, res.seq);
694 }
695
696 /*
697  * bch2_journal_flush_async - if there is an open journal entry, or a journal
698  * still being written, write it and wait for the write to complete
699  */
700 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
701 {
702         u64 seq, journal_seq;
703
704         spin_lock(&j->lock);
705         journal_seq = journal_cur_seq(j);
706
707         if (journal_entry_is_open(j)) {
708                 seq = journal_seq;
709         } else if (journal_seq) {
710                 seq = journal_seq - 1;
711         } else {
712                 spin_unlock(&j->lock);
713                 return;
714         }
715         spin_unlock(&j->lock);
716
717         bch2_journal_flush_seq_async(j, seq, parent);
718 }
719
720 int bch2_journal_flush(struct journal *j)
721 {
722         u64 seq, journal_seq;
723
724         spin_lock(&j->lock);
725         journal_seq = journal_cur_seq(j);
726
727         if (journal_entry_is_open(j)) {
728                 seq = journal_seq;
729         } else if (journal_seq) {
730                 seq = journal_seq - 1;
731         } else {
732                 spin_unlock(&j->lock);
733                 return 0;
734         }
735         spin_unlock(&j->lock);
736
737         return bch2_journal_flush_seq(j, seq);
738 }
739
740 /* block/unlock the journal: */
741
742 void bch2_journal_unblock(struct journal *j)
743 {
744         spin_lock(&j->lock);
745         j->blocked--;
746         spin_unlock(&j->lock);
747
748         journal_wake(j);
749 }
750
751 void bch2_journal_block(struct journal *j)
752 {
753         spin_lock(&j->lock);
754         j->blocked++;
755         spin_unlock(&j->lock);
756
757         journal_quiesce(j);
758 }
759
760 /* allocate journal on a device: */
761
762 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
763                                          bool new_fs, struct closure *cl)
764 {
765         struct bch_fs *c = ca->fs;
766         struct journal_device *ja = &ca->journal;
767         struct bch_sb_field_journal *journal_buckets;
768         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
769         int ret = 0;
770
771         /* don't handle reducing nr of buckets yet: */
772         if (nr <= ja->nr)
773                 return 0;
774
775         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
776         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
777         if (!new_buckets || !new_bucket_seq) {
778                 ret = -ENOMEM;
779                 goto err;
780         }
781
782         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
783                                         nr + sizeof(*journal_buckets) / sizeof(u64));
784         if (!journal_buckets) {
785                 ret = -ENOSPC;
786                 goto err;
787         }
788
789         /*
790          * We may be called from the device add path, before the new device has
791          * actually been added to the running filesystem:
792          */
793         if (!new_fs)
794                 spin_lock(&c->journal.lock);
795
796         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
797         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
798         swap(new_buckets,       ja->buckets);
799         swap(new_bucket_seq,    ja->bucket_seq);
800
801         if (!new_fs)
802                 spin_unlock(&c->journal.lock);
803
804         while (ja->nr < nr) {
805                 struct open_bucket *ob = NULL;
806                 unsigned pos;
807                 long b;
808
809                 if (new_fs) {
810                         b = bch2_bucket_alloc_new_fs(ca);
811                         if (b < 0) {
812                                 ret = -ENOSPC;
813                                 goto err;
814                         }
815                 } else {
816                         rcu_read_lock();
817                         ob = bch2_bucket_alloc(c, ca, RESERVE_NONE,
818                                                false, cl);
819                         rcu_read_unlock();
820                         if (IS_ERR(ob)) {
821                                 ret = cl ? -EAGAIN : -ENOSPC;
822                                 goto err;
823                         }
824
825                         b = sector_to_bucket(ca, ob->ptr.offset);
826                 }
827
828                 if (c)
829                         spin_lock(&c->journal.lock);
830
831                 /*
832                  * XXX
833                  * For resize at runtime, we should be writing the new
834                  * superblock before inserting into the journal array
835                  */
836
837                 pos = ja->nr ? (ja->cur_idx + 1) % ja->nr : 0;
838                 __array_insert_item(ja->buckets,                ja->nr, pos);
839                 __array_insert_item(ja->bucket_seq,             ja->nr, pos);
840                 __array_insert_item(journal_buckets->buckets,   ja->nr, pos);
841                 ja->nr++;
842
843                 ja->buckets[pos] = b;
844                 ja->bucket_seq[pos] = 0;
845                 journal_buckets->buckets[pos] = cpu_to_le64(b);
846
847                 if (pos <= ja->discard_idx)
848                         ja->discard_idx = (ja->discard_idx + 1) % ja->nr;
849                 if (pos <= ja->dirty_idx_ondisk)
850                         ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr;
851                 if (pos <= ja->dirty_idx)
852                         ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr;
853                 if (pos <= ja->cur_idx)
854                         ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
855
856                 if (c)
857                         spin_unlock(&c->journal.lock);
858
859                 if (new_fs) {
860                         bch2_mark_metadata_bucket(c, ca, b, BCH_DATA_journal,
861                                                   ca->mi.bucket_size,
862                                                   gc_phase(GC_PHASE_SB),
863                                                   0);
864                 } else {
865                         ret = bch2_trans_do(c, NULL, NULL, BTREE_INSERT_NOFAIL,
866                                 bch2_trans_mark_metadata_bucket(&trans, ca,
867                                                 b, BCH_DATA_journal,
868                                                 ca->mi.bucket_size));
869
870                         bch2_open_bucket_put(c, ob);
871
872                         if (ret)
873                                 goto err;
874                 }
875         }
876 err:
877         bch2_sb_resize_journal(&ca->disk_sb,
878                 ja->nr + sizeof(*journal_buckets) / sizeof(u64));
879         kfree(new_bucket_seq);
880         kfree(new_buckets);
881
882         return ret;
883 }
884
885 /*
886  * Allocate more journal space at runtime - not currently making use if it, but
887  * the code works:
888  */
889 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
890                                 unsigned nr)
891 {
892         struct journal_device *ja = &ca->journal;
893         struct closure cl;
894         unsigned current_nr;
895         int ret;
896
897         closure_init_stack(&cl);
898
899         do {
900                 struct disk_reservation disk_res = { 0, 0 };
901
902                 closure_sync(&cl);
903
904                 mutex_lock(&c->sb_lock);
905                 current_nr = ja->nr;
906
907                 /*
908                  * note: journal buckets aren't really counted as _sectors_ used yet, so
909                  * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
910                  * when space used goes up without a reservation - but we do need the
911                  * reservation to ensure we'll actually be able to allocate:
912                  */
913
914                 if (bch2_disk_reservation_get(c, &disk_res,
915                                               bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
916                         mutex_unlock(&c->sb_lock);
917                         return -ENOSPC;
918                 }
919
920                 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
921
922                 bch2_disk_reservation_put(c, &disk_res);
923
924                 if (ja->nr != current_nr)
925                         bch2_write_super(c);
926                 mutex_unlock(&c->sb_lock);
927         } while (ret == -EAGAIN);
928
929         return ret;
930 }
931
932 int bch2_dev_journal_alloc(struct bch_dev *ca)
933 {
934         unsigned nr;
935
936         if (dynamic_fault("bcachefs:add:journal_alloc"))
937                 return -ENOMEM;
938
939         /* 1/128th of the device by default: */
940         nr = ca->mi.nbuckets >> 7;
941
942         /*
943          * clamp journal size to 8192 buckets or 8GB (in sectors), whichever
944          * is smaller:
945          */
946         nr = clamp_t(unsigned, nr,
947                      BCH_JOURNAL_BUCKETS_MIN,
948                      min(1 << 13,
949                          (1 << 24) / ca->mi.bucket_size));
950
951         return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
952 }
953
954 /* startup/shutdown: */
955
956 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
957 {
958         union journal_res_state state;
959         bool ret = false;
960         unsigned i;
961
962         spin_lock(&j->lock);
963         state = READ_ONCE(j->reservations);
964         i = state.idx;
965
966         while (i != state.unwritten_idx) {
967                 i = (i - 1) & JOURNAL_BUF_MASK;
968                 if (bch2_bkey_has_device(bkey_i_to_s_c(&j->buf[i].key), dev_idx))
969                         ret = true;
970         }
971         spin_unlock(&j->lock);
972
973         return ret;
974 }
975
976 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
977 {
978         wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
979 }
980
981 void bch2_fs_journal_stop(struct journal *j)
982 {
983         bch2_journal_flush_all_pins(j);
984
985         wait_event(j->wait, journal_entry_close(j));
986
987         /*
988          * Always write a new journal entry, to make sure the clock hands are up
989          * to date (and match the superblock)
990          */
991         bch2_journal_meta(j);
992
993         journal_quiesce(j);
994
995         BUG_ON(!bch2_journal_error(j) &&
996                test_bit(JOURNAL_REPLAY_DONE, &j->flags) &&
997                (journal_entry_is_open(j) ||
998                 j->last_empty_seq + 1 != journal_cur_seq(j)));
999
1000         cancel_delayed_work_sync(&j->write_work);
1001         bch2_journal_reclaim_stop(j);
1002 }
1003
1004 int bch2_fs_journal_start(struct journal *j, u64 cur_seq,
1005                           struct list_head *journal_entries)
1006 {
1007         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1008         struct journal_entry_pin_list *p;
1009         struct journal_replay *i;
1010         u64 last_seq = cur_seq, nr, seq;
1011
1012         if (!list_empty(journal_entries))
1013                 last_seq = le64_to_cpu(list_last_entry(journal_entries,
1014                                 struct journal_replay, list)->j.last_seq);
1015
1016         nr = cur_seq - last_seq;
1017
1018         if (nr + 1 > j->pin.size) {
1019                 free_fifo(&j->pin);
1020                 init_fifo(&j->pin, roundup_pow_of_two(nr + 1), GFP_KERNEL);
1021                 if (!j->pin.data) {
1022                         bch_err(c, "error reallocating journal fifo (%llu open entries)", nr);
1023                         return -ENOMEM;
1024                 }
1025         }
1026
1027         j->replay_journal_seq   = last_seq;
1028         j->replay_journal_seq_end = cur_seq;
1029         j->last_seq_ondisk      = last_seq;
1030         j->pin.front            = last_seq;
1031         j->pin.back             = cur_seq;
1032         atomic64_set(&j->seq, cur_seq - 1);
1033
1034         fifo_for_each_entry_ptr(p, &j->pin, seq)
1035                 journal_pin_list_init(p, 1);
1036
1037         list_for_each_entry(i, journal_entries, list) {
1038                 unsigned ptr;
1039
1040                 seq = le64_to_cpu(i->j.seq);
1041                 BUG_ON(seq >= cur_seq);
1042
1043                 if (seq < last_seq)
1044                         continue;
1045
1046                 p = journal_seq_pin(j, seq);
1047
1048                 p->devs.nr = 0;
1049                 for (ptr = 0; ptr < i->nr_ptrs; ptr++)
1050                         bch2_dev_list_add_dev(&p->devs, i->ptrs[ptr].dev);
1051         }
1052
1053         spin_lock(&j->lock);
1054
1055         set_bit(JOURNAL_STARTED, &j->flags);
1056         j->last_flush_write = jiffies;
1057
1058         journal_pin_new_entry(j);
1059
1060         j->reservations.idx = j->reservations.unwritten_idx = journal_cur_seq(j);
1061
1062         bch2_journal_buf_init(j);
1063
1064         c->last_bucket_seq_cleanup = journal_cur_seq(j);
1065
1066         bch2_journal_space_available(j);
1067         spin_unlock(&j->lock);
1068
1069         return 0;
1070 }
1071
1072 /* init/exit: */
1073
1074 void bch2_dev_journal_exit(struct bch_dev *ca)
1075 {
1076         kfree(ca->journal.bio);
1077         kfree(ca->journal.buckets);
1078         kfree(ca->journal.bucket_seq);
1079
1080         ca->journal.bio         = NULL;
1081         ca->journal.buckets     = NULL;
1082         ca->journal.bucket_seq  = NULL;
1083 }
1084
1085 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
1086 {
1087         struct journal_device *ja = &ca->journal;
1088         struct bch_sb_field_journal *journal_buckets =
1089                 bch2_sb_get_journal(sb);
1090         unsigned i;
1091
1092         ja->nr = bch2_nr_journal_buckets(journal_buckets);
1093
1094         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1095         if (!ja->bucket_seq)
1096                 return -ENOMEM;
1097
1098         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
1099                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
1100         if (!ca->journal.bio)
1101                 return -ENOMEM;
1102
1103         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1104         if (!ja->buckets)
1105                 return -ENOMEM;
1106
1107         for (i = 0; i < ja->nr; i++)
1108                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
1109
1110         return 0;
1111 }
1112
1113 void bch2_fs_journal_exit(struct journal *j)
1114 {
1115         unsigned i;
1116
1117         for (i = 0; i < ARRAY_SIZE(j->buf); i++)
1118                 kvpfree(j->buf[i].data, j->buf[i].buf_size);
1119         free_fifo(&j->pin);
1120 }
1121
1122 int bch2_fs_journal_init(struct journal *j)
1123 {
1124         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1125         static struct lock_class_key res_key;
1126         unsigned i;
1127         int ret = 0;
1128
1129         pr_verbose_init(c->opts, "");
1130
1131         spin_lock_init(&j->lock);
1132         spin_lock_init(&j->err_lock);
1133         init_waitqueue_head(&j->wait);
1134         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
1135         init_waitqueue_head(&j->reclaim_wait);
1136         init_waitqueue_head(&j->pin_flush_wait);
1137         mutex_init(&j->reclaim_lock);
1138         mutex_init(&j->discard_lock);
1139
1140         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
1141
1142         j->write_delay_ms       = 1000;
1143         j->reclaim_delay_ms     = 100;
1144
1145         atomic64_set(&j->reservations.counter,
1146                 ((union journal_res_state)
1147                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
1148
1149         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL))) {
1150                 ret = -ENOMEM;
1151                 goto out;
1152         }
1153
1154         for (i = 0; i < ARRAY_SIZE(j->buf); i++) {
1155                 j->buf[i].buf_size = JOURNAL_ENTRY_SIZE_MIN;
1156                 j->buf[i].data = kvpmalloc(j->buf[i].buf_size, GFP_KERNEL);
1157                 if (!j->buf[i].data) {
1158                         ret = -ENOMEM;
1159                         goto out;
1160                 }
1161         }
1162
1163         j->pin.front = j->pin.back = 1;
1164 out:
1165         pr_verbose_init(c->opts, "ret %i", ret);
1166         return ret;
1167 }
1168
1169 /* debug: */
1170
1171 void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
1172 {
1173         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1174         union journal_res_state s;
1175         struct bch_dev *ca;
1176         unsigned i;
1177
1178         rcu_read_lock();
1179         s = READ_ONCE(j->reservations);
1180
1181         pr_buf(out,
1182                "active journal entries:\t%llu\n"
1183                "seq:\t\t\t%llu\n"
1184                "last_seq:\t\t%llu\n"
1185                "last_seq_ondisk:\t%llu\n"
1186                "flushed_seq_ondisk:\t%llu\n"
1187                "prereserved:\t\t%u/%u\n"
1188                "each entry reserved:\t%u\n"
1189                "nr flush writes:\t%llu\n"
1190                "nr noflush writes:\t%llu\n"
1191                "nr direct reclaim:\t%llu\n"
1192                "nr background reclaim:\t%llu\n"
1193                "reclaim kicked:\t\t%u\n"
1194                "reclaim runs in:\t%u ms\n"
1195                "current entry sectors:\t%u\n"
1196                "current entry error:\t%u\n"
1197                "current entry:\t\t",
1198                fifo_used(&j->pin),
1199                journal_cur_seq(j),
1200                journal_last_seq(j),
1201                j->last_seq_ondisk,
1202                j->flushed_seq_ondisk,
1203                j->prereserved.reserved,
1204                j->prereserved.remaining,
1205                j->entry_u64s_reserved,
1206                j->nr_flush_writes,
1207                j->nr_noflush_writes,
1208                j->nr_direct_reclaim,
1209                j->nr_background_reclaim,
1210                j->reclaim_kicked,
1211                jiffies_to_msecs(j->next_reclaim - jiffies),
1212                j->cur_entry_sectors,
1213                j->cur_entry_error);
1214
1215         switch (s.cur_entry_offset) {
1216         case JOURNAL_ENTRY_ERROR_VAL:
1217                 pr_buf(out, "error\n");
1218                 break;
1219         case JOURNAL_ENTRY_CLOSED_VAL:
1220                 pr_buf(out, "closed\n");
1221                 break;
1222         default:
1223                 pr_buf(out, "%u/%u\n",
1224                        s.cur_entry_offset,
1225                        j->cur_entry_u64s);
1226                 break;
1227         }
1228
1229         pr_buf(out,
1230                "current entry:\t\tidx %u refcount %u\n",
1231                s.idx, journal_state_count(s, s.idx));
1232
1233         i = s.idx;
1234         while (i != s.unwritten_idx) {
1235                 i = (i - 1) & JOURNAL_BUF_MASK;
1236
1237                 pr_buf(out, "unwritten entry:\tidx %u refcount %u sectors %u\n",
1238                        i, journal_state_count(s, i), j->buf[i].sectors);
1239         }
1240
1241         pr_buf(out,
1242                "need write:\t\t%i\n"
1243                "replay done:\t\t%i\n",
1244                test_bit(JOURNAL_NEED_WRITE,     &j->flags),
1245                test_bit(JOURNAL_REPLAY_DONE,    &j->flags));
1246
1247         pr_buf(out, "space:\n");
1248         pr_buf(out, "\tdiscarded\t%u:%u\n",
1249                j->space[journal_space_discarded].next_entry,
1250                j->space[journal_space_discarded].total);
1251         pr_buf(out, "\tclean ondisk\t%u:%u\n",
1252                j->space[journal_space_clean_ondisk].next_entry,
1253                j->space[journal_space_clean_ondisk].total);
1254         pr_buf(out, "\tclean\t\t%u:%u\n",
1255                j->space[journal_space_clean].next_entry,
1256                j->space[journal_space_clean].total);
1257         pr_buf(out, "\ttotal\t\t%u:%u\n",
1258                j->space[journal_space_total].next_entry,
1259                j->space[journal_space_total].total);
1260
1261         for_each_member_device_rcu(ca, c, i,
1262                                    &c->rw_devs[BCH_DATA_journal]) {
1263                 struct journal_device *ja = &ca->journal;
1264
1265                 if (!test_bit(ca->dev_idx, c->rw_devs[BCH_DATA_journal].d))
1266                         continue;
1267
1268                 if (!ja->nr)
1269                         continue;
1270
1271                 pr_buf(out,
1272                        "dev %u:\n"
1273                        "\tnr\t\t%u\n"
1274                        "\tbucket size\t%u\n"
1275                        "\tavailable\t%u:%u\n"
1276                        "\tdiscard_idx\t%u\n"
1277                        "\tdirty_ondisk\t%u (seq %llu)\n"
1278                        "\tdirty_idx\t%u (seq %llu)\n"
1279                        "\tcur_idx\t\t%u (seq %llu)\n",
1280                        i, ja->nr, ca->mi.bucket_size,
1281                        bch2_journal_dev_buckets_available(j, ja, journal_space_discarded),
1282                        ja->sectors_free,
1283                        ja->discard_idx,
1284                        ja->dirty_idx_ondisk,    ja->bucket_seq[ja->dirty_idx_ondisk],
1285                        ja->dirty_idx,           ja->bucket_seq[ja->dirty_idx],
1286                        ja->cur_idx,             ja->bucket_seq[ja->cur_idx]);
1287         }
1288
1289         rcu_read_unlock();
1290 }
1291
1292 void bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
1293 {
1294         spin_lock(&j->lock);
1295         __bch2_journal_debug_to_text(out, j);
1296         spin_unlock(&j->lock);
1297 }
1298
1299 void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j)
1300 {
1301         struct journal_entry_pin_list *pin_list;
1302         struct journal_entry_pin *pin;
1303         u64 i;
1304
1305         spin_lock(&j->lock);
1306         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
1307                 pr_buf(out, "%llu: count %u\n",
1308                        i, atomic_read(&pin_list->count));
1309
1310                 list_for_each_entry(pin, &pin_list->list, list)
1311                         pr_buf(out, "\t%px %ps\n",
1312                                pin, pin->flush);
1313
1314                 if (!list_empty(&pin_list->flushed))
1315                         pr_buf(out, "flushed:\n");
1316
1317                 list_for_each_entry(pin, &pin_list->flushed, list)
1318                         pr_buf(out, "\t%px %ps\n",
1319                                pin, pin->flush);
1320         }
1321         spin_unlock(&j->lock);
1322 }