]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
Add upstream files
[bcachefs-tools-debian] / libbcachefs / journal.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * bcachefs journalling code, for btree insertions
4  *
5  * Copyright 2012 Google, Inc.
6  */
7
8 #include "bcachefs.h"
9 #include "alloc_foreground.h"
10 #include "bkey_methods.h"
11 #include "btree_gc.h"
12 #include "buckets.h"
13 #include "journal.h"
14 #include "journal_io.h"
15 #include "journal_reclaim.h"
16 #include "journal_seq_blacklist.h"
17 #include "super-io.h"
18
19 #include <trace/events/bcachefs.h>
20
21 static inline struct journal_buf *journal_seq_to_buf(struct journal *, u64);
22
23 static bool __journal_entry_is_open(union journal_res_state state)
24 {
25         return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
26 }
27
28 static bool journal_entry_is_open(struct journal *j)
29 {
30         return __journal_entry_is_open(j->reservations);
31 }
32
33 static void journal_pin_new_entry(struct journal *j, int count)
34 {
35         struct journal_entry_pin_list *p;
36
37         /*
38          * The fifo_push() needs to happen at the same time as j->seq is
39          * incremented for journal_last_seq() to be calculated correctly
40          */
41         atomic64_inc(&j->seq);
42         p = fifo_push_ref(&j->pin);
43
44         INIT_LIST_HEAD(&p->list);
45         INIT_LIST_HEAD(&p->flushed);
46         atomic_set(&p->count, count);
47         p->devs.nr = 0;
48 }
49
50 static void bch2_journal_buf_init(struct journal *j)
51 {
52         struct journal_buf *buf = journal_cur_buf(j);
53
54         memset(buf->has_inode, 0, sizeof(buf->has_inode));
55
56         memset(buf->data, 0, sizeof(*buf->data));
57         buf->data->seq  = cpu_to_le64(journal_cur_seq(j));
58         buf->data->u64s = 0;
59 }
60
61 void bch2_journal_halt(struct journal *j)
62 {
63         union journal_res_state old, new;
64         u64 v = atomic64_read(&j->reservations.counter);
65
66         do {
67                 old.v = new.v = v;
68                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
69                         return;
70
71                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
72         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
73                                        old.v, new.v)) != old.v);
74
75         journal_wake(j);
76         closure_wake_up(&journal_cur_buf(j)->wait);
77 }
78
79 /* journal entry close/open: */
80
81 void __bch2_journal_buf_put(struct journal *j, bool need_write_just_set)
82 {
83         if (!need_write_just_set &&
84             test_bit(JOURNAL_NEED_WRITE, &j->flags))
85                 bch2_time_stats_update(j->delay_time,
86                                        j->need_write_time);
87
88         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
89
90         closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
91 }
92
93 /*
94  * Returns true if journal entry is now closed:
95  */
96 static bool __journal_entry_close(struct journal *j)
97 {
98         struct bch_fs *c = container_of(j, struct bch_fs, journal);
99         struct journal_buf *buf = journal_cur_buf(j);
100         union journal_res_state old, new;
101         u64 v = atomic64_read(&j->reservations.counter);
102         bool set_need_write = false;
103         unsigned sectors;
104
105         lockdep_assert_held(&j->lock);
106
107         do {
108                 old.v = new.v = v;
109                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
110                         return true;
111
112                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
113                         /* this entry will never be written: */
114                         closure_wake_up(&buf->wait);
115                         return true;
116                 }
117
118                 if (!test_bit(JOURNAL_NEED_WRITE, &j->flags)) {
119                         set_bit(JOURNAL_NEED_WRITE, &j->flags);
120                         j->need_write_time = local_clock();
121                         set_need_write = true;
122                 }
123
124                 if (new.prev_buf_unwritten)
125                         return false;
126
127                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
128                 new.idx++;
129                 new.prev_buf_unwritten = 1;
130
131                 BUG_ON(journal_state_count(new, new.idx));
132         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
133                                        old.v, new.v)) != old.v);
134
135         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
136
137         sectors = vstruct_blocks_plus(buf->data, c->block_bits,
138                                       buf->u64s_reserved) << c->block_bits;
139         BUG_ON(sectors > buf->sectors);
140         buf->sectors = sectors;
141
142         bkey_extent_init(&buf->key);
143
144         /*
145          * We have to set last_seq here, _before_ opening a new journal entry:
146          *
147          * A threads may replace an old pin with a new pin on their current
148          * journal reservation - the expectation being that the journal will
149          * contain either what the old pin protected or what the new pin
150          * protects.
151          *
152          * After the old pin is dropped journal_last_seq() won't include the old
153          * pin, so we can only write the updated last_seq on the entry that
154          * contains whatever the new pin protects.
155          *
156          * Restated, we can _not_ update last_seq for a given entry if there
157          * could be a newer entry open with reservations/pins that have been
158          * taken against it.
159          *
160          * Hence, we want update/set last_seq on the current journal entry right
161          * before we open a new one:
162          */
163         buf->data->last_seq     = cpu_to_le64(journal_last_seq(j));
164
165         if (journal_entry_empty(buf->data))
166                 clear_bit(JOURNAL_NOT_EMPTY, &j->flags);
167         else
168                 set_bit(JOURNAL_NOT_EMPTY, &j->flags);
169
170         journal_pin_new_entry(j, 1);
171
172         bch2_journal_buf_init(j);
173
174         cancel_delayed_work(&j->write_work);
175
176         bch2_journal_space_available(j);
177
178         bch2_journal_buf_put(j, old.idx, set_need_write);
179         return true;
180 }
181
182 static bool journal_entry_close(struct journal *j)
183 {
184         bool ret;
185
186         spin_lock(&j->lock);
187         ret = __journal_entry_close(j);
188         spin_unlock(&j->lock);
189
190         return ret;
191 }
192
193 /*
194  * should _only_ called from journal_res_get() - when we actually want a
195  * journal reservation - journal entry is open means journal is dirty:
196  *
197  * returns:
198  * 0:           success
199  * -ENOSPC:     journal currently full, must invoke reclaim
200  * -EAGAIN:     journal blocked, must wait
201  * -EROFS:      insufficient rw devices or journal error
202  */
203 static int journal_entry_open(struct journal *j)
204 {
205         struct journal_buf *buf = journal_cur_buf(j);
206         union journal_res_state old, new;
207         int u64s;
208         u64 v;
209
210         lockdep_assert_held(&j->lock);
211         BUG_ON(journal_entry_is_open(j));
212
213         if (j->blocked)
214                 return -EAGAIN;
215
216         if (j->cur_entry_error)
217                 return j->cur_entry_error;
218
219         BUG_ON(!j->cur_entry_sectors);
220
221         buf->u64s_reserved      = j->entry_u64s_reserved;
222         buf->disk_sectors       = j->cur_entry_sectors;
223         buf->sectors            = min(buf->disk_sectors, buf->buf_size >> 9);
224
225         u64s = (int) (buf->sectors << 9) / sizeof(u64) -
226                 journal_entry_overhead(j);
227         u64s  = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1);
228
229         if (u64s <= le32_to_cpu(buf->data->u64s))
230                 return -ENOSPC;
231
232         /*
233          * Must be set before marking the journal entry as open:
234          */
235         j->cur_entry_u64s = u64s;
236
237         v = atomic64_read(&j->reservations.counter);
238         do {
239                 old.v = new.v = v;
240
241                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
242                         return -EROFS;
243
244                 /* Handle any already added entries */
245                 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
246
247                 EBUG_ON(journal_state_count(new, new.idx));
248                 journal_state_inc(&new);
249         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
250                                        old.v, new.v)) != old.v);
251
252         if (j->res_get_blocked_start)
253                 bch2_time_stats_update(j->blocked_time,
254                                        j->res_get_blocked_start);
255         j->res_get_blocked_start = 0;
256
257         mod_delayed_work(system_freezable_wq,
258                          &j->write_work,
259                          msecs_to_jiffies(j->write_delay_ms));
260         journal_wake(j);
261         return 0;
262 }
263
264 static bool journal_quiesced(struct journal *j)
265 {
266         union journal_res_state state = READ_ONCE(j->reservations);
267         bool ret = !state.prev_buf_unwritten && !__journal_entry_is_open(state);
268
269         if (!ret)
270                 journal_entry_close(j);
271         return ret;
272 }
273
274 static void journal_quiesce(struct journal *j)
275 {
276         wait_event(j->wait, journal_quiesced(j));
277 }
278
279 static void journal_write_work(struct work_struct *work)
280 {
281         struct journal *j = container_of(work, struct journal, write_work.work);
282
283         journal_entry_close(j);
284 }
285
286 /*
287  * Given an inode number, if that inode number has data in the journal that
288  * hasn't yet been flushed, return the journal sequence number that needs to be
289  * flushed:
290  */
291 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
292 {
293         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
294         u64 seq = 0;
295
296         if (!test_bit(h, j->buf[0].has_inode) &&
297             !test_bit(h, j->buf[1].has_inode))
298                 return 0;
299
300         spin_lock(&j->lock);
301         if (test_bit(h, journal_cur_buf(j)->has_inode))
302                 seq = journal_cur_seq(j);
303         else if (test_bit(h, journal_prev_buf(j)->has_inode))
304                 seq = journal_cur_seq(j) - 1;
305         spin_unlock(&j->lock);
306
307         return seq;
308 }
309
310 void bch2_journal_set_has_inum(struct journal *j, u64 inode, u64 seq)
311 {
312         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
313         struct journal_buf *buf;
314
315         spin_lock(&j->lock);
316
317         if ((buf = journal_seq_to_buf(j, seq)))
318                 set_bit(h, buf->has_inode);
319
320         spin_unlock(&j->lock);
321 }
322
323 static int __journal_res_get(struct journal *j, struct journal_res *res,
324                              unsigned flags)
325 {
326         struct bch_fs *c = container_of(j, struct bch_fs, journal);
327         struct journal_buf *buf;
328         bool can_discard;
329         int ret;
330 retry:
331         if (journal_res_get_fast(j, res, flags))
332                 return 0;
333
334         if (bch2_journal_error(j))
335                 return -EROFS;
336
337         spin_lock(&j->lock);
338
339         /*
340          * Recheck after taking the lock, so we don't race with another thread
341          * that just did journal_entry_open() and call journal_entry_close()
342          * unnecessarily
343          */
344         if (journal_res_get_fast(j, res, flags)) {
345                 spin_unlock(&j->lock);
346                 return 0;
347         }
348
349         if (!(flags & JOURNAL_RES_GET_RESERVED) &&
350             !test_bit(JOURNAL_MAY_GET_UNRESERVED, &j->flags)) {
351                 /*
352                  * Don't want to close current journal entry, just need to
353                  * invoke reclaim:
354                  */
355                 ret = -ENOSPC;
356                 goto unlock;
357         }
358
359         /*
360          * If we couldn't get a reservation because the current buf filled up,
361          * and we had room for a bigger entry on disk, signal that we want to
362          * realloc the journal bufs:
363          */
364         buf = journal_cur_buf(j);
365         if (journal_entry_is_open(j) &&
366             buf->buf_size >> 9 < buf->disk_sectors &&
367             buf->buf_size < JOURNAL_ENTRY_SIZE_MAX)
368                 j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1);
369
370         if (journal_entry_is_open(j) &&
371             !__journal_entry_close(j)) {
372                 /*
373                  * We failed to get a reservation on the current open journal
374                  * entry because it's full, and we can't close it because
375                  * there's still a previous one in flight:
376                  */
377                 trace_journal_entry_full(c);
378                 ret = -EAGAIN;
379         } else {
380                 ret = journal_entry_open(j);
381         }
382 unlock:
383         if ((ret == -EAGAIN || ret == -ENOSPC) &&
384             !j->res_get_blocked_start)
385                 j->res_get_blocked_start = local_clock() ?: 1;
386
387         can_discard = j->can_discard;
388         spin_unlock(&j->lock);
389
390         if (!ret)
391                 goto retry;
392
393         if (ret == -ENOSPC) {
394                 WARN_ONCE(!can_discard && (flags & JOURNAL_RES_GET_RESERVED),
395                           "JOURNAL_RES_GET_RESERVED set but journal full");
396
397                 /*
398                  * Journal is full - can't rely on reclaim from work item due to
399                  * freezing:
400                  */
401                 trace_journal_full(c);
402
403                 if (!(flags & JOURNAL_RES_GET_NONBLOCK)) {
404                         if (can_discard) {
405                                 bch2_journal_do_discards(j);
406                                 goto retry;
407                         }
408
409                         if (mutex_trylock(&j->reclaim_lock)) {
410                                 bch2_journal_reclaim(j);
411                                 mutex_unlock(&j->reclaim_lock);
412                         }
413                 }
414
415                 ret = -EAGAIN;
416         }
417
418         return ret;
419 }
420
421 /*
422  * Essentially the entry function to the journaling code. When bcachefs is doing
423  * a btree insert, it calls this function to get the current journal write.
424  * Journal write is the structure used set up journal writes. The calling
425  * function will then add its keys to the structure, queuing them for the next
426  * write.
427  *
428  * To ensure forward progress, the current task must not be holding any
429  * btree node write locks.
430  */
431 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
432                                   unsigned flags)
433 {
434         int ret;
435
436         closure_wait_event(&j->async_wait,
437                    (ret = __journal_res_get(j, res, flags)) != -EAGAIN ||
438                    (flags & JOURNAL_RES_GET_NONBLOCK));
439         return ret;
440 }
441
442 /* journal_preres: */
443
444 static bool journal_preres_available(struct journal *j,
445                                      struct journal_preres *res,
446                                      unsigned new_u64s,
447                                      unsigned flags)
448 {
449         bool ret = bch2_journal_preres_get_fast(j, res, new_u64s, flags);
450
451         if (!ret)
452                 bch2_journal_reclaim_work(&j->reclaim_work.work);
453
454         return ret;
455 }
456
457 int __bch2_journal_preres_get(struct journal *j,
458                               struct journal_preres *res,
459                               unsigned new_u64s,
460                               unsigned flags)
461 {
462         int ret;
463
464         closure_wait_event(&j->preres_wait,
465                    (ret = bch2_journal_error(j)) ||
466                    journal_preres_available(j, res, new_u64s, flags));
467         return ret;
468 }
469
470 /* journal_entry_res: */
471
472 void bch2_journal_entry_res_resize(struct journal *j,
473                                    struct journal_entry_res *res,
474                                    unsigned new_u64s)
475 {
476         union journal_res_state state;
477         int d = new_u64s - res->u64s;
478
479         spin_lock(&j->lock);
480
481         j->entry_u64s_reserved += d;
482         if (d <= 0)
483                 goto out;
484
485         j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d);
486         smp_mb();
487         state = READ_ONCE(j->reservations);
488
489         if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL &&
490             state.cur_entry_offset > j->cur_entry_u64s) {
491                 j->cur_entry_u64s += d;
492                 /*
493                  * Not enough room in current journal entry, have to flush it:
494                  */
495                 __journal_entry_close(j);
496         } else {
497                 journal_cur_buf(j)->u64s_reserved += d;
498         }
499 out:
500         spin_unlock(&j->lock);
501         res->u64s += d;
502 }
503
504 /* journal flushing: */
505
506 u64 bch2_journal_last_unwritten_seq(struct journal *j)
507 {
508         u64 seq;
509
510         spin_lock(&j->lock);
511         seq = journal_cur_seq(j);
512         if (j->reservations.prev_buf_unwritten)
513                 seq--;
514         spin_unlock(&j->lock);
515
516         return seq;
517 }
518
519 /**
520  * bch2_journal_open_seq_async - try to open a new journal entry if @seq isn't
521  * open yet, or wait if we cannot
522  *
523  * used by the btree interior update machinery, when it needs to write a new
524  * btree root - every journal entry contains the roots of all the btrees, so it
525  * doesn't need to bother with getting a journal reservation
526  */
527 int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *cl)
528 {
529         struct bch_fs *c = container_of(j, struct bch_fs, journal);
530         int ret;
531
532         spin_lock(&j->lock);
533
534         /*
535          * Can't try to open more than one sequence number ahead:
536          */
537         BUG_ON(journal_cur_seq(j) < seq && !journal_entry_is_open(j));
538
539         if (journal_cur_seq(j) > seq ||
540             journal_entry_is_open(j)) {
541                 spin_unlock(&j->lock);
542                 return 0;
543         }
544
545         if (journal_cur_seq(j) < seq &&
546             !__journal_entry_close(j)) {
547                 /* haven't finished writing out the previous one: */
548                 trace_journal_entry_full(c);
549                 ret = -EAGAIN;
550         } else {
551                 BUG_ON(journal_cur_seq(j) != seq);
552
553                 ret = journal_entry_open(j);
554         }
555
556         if ((ret == -EAGAIN || ret == -ENOSPC) &&
557             !j->res_get_blocked_start)
558                 j->res_get_blocked_start = local_clock() ?: 1;
559
560         if (ret == -EAGAIN || ret == -ENOSPC)
561                 closure_wait(&j->async_wait, cl);
562
563         spin_unlock(&j->lock);
564
565         if (ret == -ENOSPC) {
566                 trace_journal_full(c);
567                 bch2_journal_reclaim_work(&j->reclaim_work.work);
568                 ret = -EAGAIN;
569         }
570
571         return ret;
572 }
573
574 static int journal_seq_error(struct journal *j, u64 seq)
575 {
576         union journal_res_state state = READ_ONCE(j->reservations);
577
578         if (seq == journal_cur_seq(j))
579                 return bch2_journal_error(j);
580
581         if (seq + 1 == journal_cur_seq(j) &&
582             !state.prev_buf_unwritten &&
583             seq > j->seq_ondisk)
584                 return -EIO;
585
586         return 0;
587 }
588
589 static inline struct journal_buf *
590 journal_seq_to_buf(struct journal *j, u64 seq)
591 {
592         /* seq should be for a journal entry that has been opened: */
593         BUG_ON(seq > journal_cur_seq(j));
594         BUG_ON(seq == journal_cur_seq(j) &&
595                j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
596
597         if (seq == journal_cur_seq(j))
598                 return journal_cur_buf(j);
599         if (seq + 1 == journal_cur_seq(j) &&
600             j->reservations.prev_buf_unwritten)
601                 return journal_prev_buf(j);
602         return NULL;
603 }
604
605 /**
606  * bch2_journal_wait_on_seq - wait for a journal entry to be written
607  *
608  * does _not_ cause @seq to be written immediately - if there is no other
609  * activity to cause the relevant journal entry to be filled up or flushed it
610  * can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is
611  * configurable).
612  */
613 void bch2_journal_wait_on_seq(struct journal *j, u64 seq,
614                               struct closure *parent)
615 {
616         struct journal_buf *buf;
617
618         spin_lock(&j->lock);
619
620         if ((buf = journal_seq_to_buf(j, seq))) {
621                 if (!closure_wait(&buf->wait, parent))
622                         BUG();
623
624                 if (seq == journal_cur_seq(j)) {
625                         smp_mb();
626                         if (bch2_journal_error(j))
627                                 closure_wake_up(&buf->wait);
628                 }
629         }
630
631         spin_unlock(&j->lock);
632 }
633
634 /**
635  * bch2_journal_flush_seq_async - wait for a journal entry to be written
636  *
637  * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
638  * necessary
639  */
640 void bch2_journal_flush_seq_async(struct journal *j, u64 seq,
641                                   struct closure *parent)
642 {
643         struct journal_buf *buf;
644
645         spin_lock(&j->lock);
646
647         if (parent &&
648             (buf = journal_seq_to_buf(j, seq)))
649                 if (!closure_wait(&buf->wait, parent))
650                         BUG();
651
652         if (seq == journal_cur_seq(j))
653                 __journal_entry_close(j);
654         spin_unlock(&j->lock);
655 }
656
657 static int journal_seq_flushed(struct journal *j, u64 seq)
658 {
659         int ret;
660
661         spin_lock(&j->lock);
662         ret = seq <= j->seq_ondisk ? 1 : journal_seq_error(j, seq);
663
664         if (seq == journal_cur_seq(j))
665                 __journal_entry_close(j);
666         spin_unlock(&j->lock);
667
668         return ret;
669 }
670
671 int bch2_journal_flush_seq(struct journal *j, u64 seq)
672 {
673         u64 start_time = local_clock();
674         int ret, ret2;
675
676         ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
677
678         bch2_time_stats_update(j->flush_seq_time, start_time);
679
680         return ret ?: ret2 < 0 ? ret2 : 0;
681 }
682
683 /**
684  * bch2_journal_meta_async - force a journal entry to be written
685  */
686 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
687 {
688         struct journal_res res;
689
690         memset(&res, 0, sizeof(res));
691
692         bch2_journal_res_get(j, &res, jset_u64s(0), 0);
693         bch2_journal_res_put(j, &res);
694
695         bch2_journal_flush_seq_async(j, res.seq, parent);
696 }
697
698 int bch2_journal_meta(struct journal *j)
699 {
700         struct journal_res res;
701         int ret;
702
703         memset(&res, 0, sizeof(res));
704
705         ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
706         if (ret)
707                 return ret;
708
709         bch2_journal_res_put(j, &res);
710
711         return bch2_journal_flush_seq(j, res.seq);
712 }
713
714 /*
715  * bch2_journal_flush_async - if there is an open journal entry, or a journal
716  * still being written, write it and wait for the write to complete
717  */
718 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
719 {
720         u64 seq, journal_seq;
721
722         spin_lock(&j->lock);
723         journal_seq = journal_cur_seq(j);
724
725         if (journal_entry_is_open(j)) {
726                 seq = journal_seq;
727         } else if (journal_seq) {
728                 seq = journal_seq - 1;
729         } else {
730                 spin_unlock(&j->lock);
731                 return;
732         }
733         spin_unlock(&j->lock);
734
735         bch2_journal_flush_seq_async(j, seq, parent);
736 }
737
738 int bch2_journal_flush(struct journal *j)
739 {
740         u64 seq, journal_seq;
741
742         spin_lock(&j->lock);
743         journal_seq = journal_cur_seq(j);
744
745         if (journal_entry_is_open(j)) {
746                 seq = journal_seq;
747         } else if (journal_seq) {
748                 seq = journal_seq - 1;
749         } else {
750                 spin_unlock(&j->lock);
751                 return 0;
752         }
753         spin_unlock(&j->lock);
754
755         return bch2_journal_flush_seq(j, seq);
756 }
757
758 /* block/unlock the journal: */
759
760 void bch2_journal_unblock(struct journal *j)
761 {
762         spin_lock(&j->lock);
763         j->blocked--;
764         spin_unlock(&j->lock);
765
766         journal_wake(j);
767 }
768
769 void bch2_journal_block(struct journal *j)
770 {
771         spin_lock(&j->lock);
772         j->blocked++;
773         spin_unlock(&j->lock);
774
775         journal_quiesce(j);
776 }
777
778 /* allocate journal on a device: */
779
780 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
781                                          bool new_fs, struct closure *cl)
782 {
783         struct bch_fs *c = ca->fs;
784         struct journal_device *ja = &ca->journal;
785         struct bch_sb_field_journal *journal_buckets;
786         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
787         int ret = 0;
788
789         /* don't handle reducing nr of buckets yet: */
790         if (nr <= ja->nr)
791                 return 0;
792
793         ret = -ENOMEM;
794         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
795         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
796         if (!new_buckets || !new_bucket_seq)
797                 goto err;
798
799         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
800                                                  nr + sizeof(*journal_buckets) / sizeof(u64));
801         if (!journal_buckets)
802                 goto err;
803
804         /*
805          * We may be called from the device add path, before the new device has
806          * actually been added to the running filesystem:
807          */
808         if (c)
809                 spin_lock(&c->journal.lock);
810
811         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
812         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
813         swap(new_buckets,       ja->buckets);
814         swap(new_bucket_seq,    ja->bucket_seq);
815
816         if (c)
817                 spin_unlock(&c->journal.lock);
818
819         while (ja->nr < nr) {
820                 struct open_bucket *ob = NULL;
821                 unsigned pos;
822                 long bucket;
823
824                 if (new_fs) {
825                         bucket = bch2_bucket_alloc_new_fs(ca);
826                         if (bucket < 0) {
827                                 ret = -ENOSPC;
828                                 goto err;
829                         }
830                 } else {
831                         ob = bch2_bucket_alloc(c, ca, RESERVE_ALLOC,
832                                                false, cl);
833                         if (IS_ERR(ob)) {
834                                 ret = cl ? -EAGAIN : -ENOSPC;
835                                 goto err;
836                         }
837
838                         bucket = sector_to_bucket(ca, ob->ptr.offset);
839                 }
840
841                 if (c) {
842                         percpu_down_read(&c->mark_lock);
843                         spin_lock(&c->journal.lock);
844                 }
845
846                 pos = ja->nr ? (ja->cur_idx + 1) % ja->nr : 0;
847                 __array_insert_item(ja->buckets,                ja->nr, pos);
848                 __array_insert_item(ja->bucket_seq,             ja->nr, pos);
849                 __array_insert_item(journal_buckets->buckets,   ja->nr, pos);
850                 ja->nr++;
851
852                 ja->buckets[pos] = bucket;
853                 ja->bucket_seq[pos] = 0;
854                 journal_buckets->buckets[pos] = cpu_to_le64(bucket);
855
856                 if (pos <= ja->discard_idx)
857                         ja->discard_idx = (ja->discard_idx + 1) % ja->nr;
858                 if (pos <= ja->dirty_idx_ondisk)
859                         ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr;
860                 if (pos <= ja->dirty_idx)
861                         ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr;
862                 if (pos <= ja->cur_idx)
863                         ja->cur_idx = (ja->cur_idx + 1) % ja->nr;
864
865                 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_journal,
866                                           ca->mi.bucket_size,
867                                           gc_phase(GC_PHASE_SB),
868                                           0);
869
870                 if (c) {
871                         spin_unlock(&c->journal.lock);
872                         percpu_up_read(&c->mark_lock);
873                 }
874
875                 if (!new_fs)
876                         bch2_open_bucket_put(c, ob);
877         }
878
879         ret = 0;
880 err:
881         kfree(new_bucket_seq);
882         kfree(new_buckets);
883
884         return ret;
885 }
886
887 /*
888  * Allocate more journal space at runtime - not currently making use if it, but
889  * the code works:
890  */
891 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
892                                 unsigned nr)
893 {
894         struct journal_device *ja = &ca->journal;
895         struct closure cl;
896         unsigned current_nr;
897         int ret;
898
899         closure_init_stack(&cl);
900
901         do {
902                 struct disk_reservation disk_res = { 0, 0 };
903
904                 closure_sync(&cl);
905
906                 mutex_lock(&c->sb_lock);
907                 current_nr = ja->nr;
908
909                 /*
910                  * note: journal buckets aren't really counted as _sectors_ used yet, so
911                  * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
912                  * when space used goes up without a reservation - but we do need the
913                  * reservation to ensure we'll actually be able to allocate:
914                  */
915
916                 if (bch2_disk_reservation_get(c, &disk_res,
917                                               bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
918                         mutex_unlock(&c->sb_lock);
919                         return -ENOSPC;
920                 }
921
922                 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
923
924                 bch2_disk_reservation_put(c, &disk_res);
925
926                 if (ja->nr != current_nr)
927                         bch2_write_super(c);
928                 mutex_unlock(&c->sb_lock);
929         } while (ret == -EAGAIN);
930
931         return ret;
932 }
933
934 int bch2_dev_journal_alloc(struct bch_dev *ca)
935 {
936         unsigned nr;
937
938         if (dynamic_fault("bcachefs:add:journal_alloc"))
939                 return -ENOMEM;
940
941         /*
942          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
943          * is smaller:
944          */
945         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
946                      BCH_JOURNAL_BUCKETS_MIN,
947                      min(1 << 10,
948                          (1 << 20) / ca->mi.bucket_size));
949
950         return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
951 }
952
953 /* startup/shutdown: */
954
955 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
956 {
957         union journal_res_state state;
958         struct journal_buf *w;
959         bool ret;
960
961         spin_lock(&j->lock);
962         state = READ_ONCE(j->reservations);
963         w = j->buf + !state.idx;
964
965         ret = state.prev_buf_unwritten &&
966                 bch2_bkey_has_device(bkey_i_to_s_c(&w->key), dev_idx);
967         spin_unlock(&j->lock);
968
969         return ret;
970 }
971
972 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
973 {
974         wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
975 }
976
977 void bch2_fs_journal_stop(struct journal *j)
978 {
979         bch2_journal_flush_all_pins(j);
980
981         wait_event(j->wait, journal_entry_close(j));
982
983         /* do we need to write another journal entry? */
984         if (test_bit(JOURNAL_NOT_EMPTY, &j->flags))
985                 bch2_journal_meta(j);
986
987         journal_quiesce(j);
988
989         BUG_ON(!bch2_journal_error(j) &&
990                test_bit(JOURNAL_NOT_EMPTY, &j->flags));
991
992         cancel_delayed_work_sync(&j->write_work);
993         cancel_delayed_work_sync(&j->reclaim_work);
994 }
995
996 int bch2_fs_journal_start(struct journal *j, u64 cur_seq,
997                           struct list_head *journal_entries)
998 {
999         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1000         struct journal_entry_pin_list *p;
1001         struct journal_replay *i;
1002         u64 last_seq = cur_seq, nr, seq;
1003
1004         if (!list_empty(journal_entries))
1005                 last_seq = le64_to_cpu(list_last_entry(journal_entries,
1006                                 struct journal_replay, list)->j.last_seq);
1007
1008         nr = cur_seq - last_seq;
1009
1010         if (nr + 1 > j->pin.size) {
1011                 free_fifo(&j->pin);
1012                 init_fifo(&j->pin, roundup_pow_of_two(nr + 1), GFP_KERNEL);
1013                 if (!j->pin.data) {
1014                         bch_err(c, "error reallocating journal fifo (%llu open entries)", nr);
1015                         return -ENOMEM;
1016                 }
1017         }
1018
1019         j->replay_journal_seq   = last_seq;
1020         j->replay_journal_seq_end = cur_seq;
1021         j->last_seq_ondisk      = last_seq;
1022         j->pin.front            = last_seq;
1023         j->pin.back             = cur_seq;
1024         atomic64_set(&j->seq, cur_seq - 1);
1025
1026         fifo_for_each_entry_ptr(p, &j->pin, seq) {
1027                 INIT_LIST_HEAD(&p->list);
1028                 INIT_LIST_HEAD(&p->flushed);
1029                 atomic_set(&p->count, 1);
1030                 p->devs.nr = 0;
1031         }
1032
1033         list_for_each_entry(i, journal_entries, list) {
1034                 seq = le64_to_cpu(i->j.seq);
1035                 BUG_ON(seq >= cur_seq);
1036
1037                 if (seq < last_seq)
1038                         continue;
1039
1040                 journal_seq_pin(j, seq)->devs = i->devs;
1041         }
1042
1043         spin_lock(&j->lock);
1044
1045         set_bit(JOURNAL_STARTED, &j->flags);
1046
1047         journal_pin_new_entry(j, 1);
1048         bch2_journal_buf_init(j);
1049
1050         c->last_bucket_seq_cleanup = journal_cur_seq(j);
1051
1052         bch2_journal_space_available(j);
1053         spin_unlock(&j->lock);
1054
1055         return 0;
1056 }
1057
1058 /* init/exit: */
1059
1060 void bch2_dev_journal_exit(struct bch_dev *ca)
1061 {
1062         kfree(ca->journal.bio);
1063         kfree(ca->journal.buckets);
1064         kfree(ca->journal.bucket_seq);
1065
1066         ca->journal.bio         = NULL;
1067         ca->journal.buckets     = NULL;
1068         ca->journal.bucket_seq  = NULL;
1069 }
1070
1071 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
1072 {
1073         struct journal_device *ja = &ca->journal;
1074         struct bch_sb_field_journal *journal_buckets =
1075                 bch2_sb_get_journal(sb);
1076         unsigned i;
1077
1078         ja->nr = bch2_nr_journal_buckets(journal_buckets);
1079
1080         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1081         if (!ja->bucket_seq)
1082                 return -ENOMEM;
1083
1084         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
1085                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
1086         if (!ca->journal.bio)
1087                 return -ENOMEM;
1088
1089         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1090         if (!ja->buckets)
1091                 return -ENOMEM;
1092
1093         for (i = 0; i < ja->nr; i++)
1094                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
1095
1096         return 0;
1097 }
1098
1099 void bch2_fs_journal_exit(struct journal *j)
1100 {
1101         kvpfree(j->buf[1].data, j->buf[1].buf_size);
1102         kvpfree(j->buf[0].data, j->buf[0].buf_size);
1103         free_fifo(&j->pin);
1104 }
1105
1106 int bch2_fs_journal_init(struct journal *j)
1107 {
1108         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1109         static struct lock_class_key res_key;
1110         int ret = 0;
1111
1112         pr_verbose_init(c->opts, "");
1113
1114         spin_lock_init(&j->lock);
1115         spin_lock_init(&j->err_lock);
1116         init_waitqueue_head(&j->wait);
1117         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
1118         INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work);
1119         init_waitqueue_head(&j->pin_flush_wait);
1120         mutex_init(&j->reclaim_lock);
1121         mutex_init(&j->discard_lock);
1122
1123         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
1124
1125         j->buf[0].buf_size      = JOURNAL_ENTRY_SIZE_MIN;
1126         j->buf[1].buf_size      = JOURNAL_ENTRY_SIZE_MIN;
1127         j->write_delay_ms       = 1000;
1128         j->reclaim_delay_ms     = 100;
1129
1130         /* Btree roots: */
1131         j->entry_u64s_reserved +=
1132                 BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
1133
1134         atomic64_set(&j->reservations.counter,
1135                 ((union journal_res_state)
1136                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
1137
1138         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
1139             !(j->buf[0].data = kvpmalloc(j->buf[0].buf_size, GFP_KERNEL)) ||
1140             !(j->buf[1].data = kvpmalloc(j->buf[1].buf_size, GFP_KERNEL))) {
1141                 ret = -ENOMEM;
1142                 goto out;
1143         }
1144
1145         j->pin.front = j->pin.back = 1;
1146 out:
1147         pr_verbose_init(c->opts, "ret %i", ret);
1148         return ret;
1149 }
1150
1151 /* debug: */
1152
1153 void bch2_journal_debug_to_text(struct printbuf *out, struct journal *j)
1154 {
1155         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1156         union journal_res_state s;
1157         struct bch_dev *ca;
1158         unsigned iter;
1159
1160         rcu_read_lock();
1161         spin_lock(&j->lock);
1162         s = READ_ONCE(j->reservations);
1163
1164         pr_buf(out,
1165                "active journal entries:\t%llu\n"
1166                "seq:\t\t\t%llu\n"
1167                "last_seq:\t\t%llu\n"
1168                "last_seq_ondisk:\t%llu\n"
1169                "prereserved:\t\t%u/%u\n"
1170                "current entry sectors:\t%u\n"
1171                "current entry:\t\t",
1172                fifo_used(&j->pin),
1173                journal_cur_seq(j),
1174                journal_last_seq(j),
1175                j->last_seq_ondisk,
1176                j->prereserved.reserved,
1177                j->prereserved.remaining,
1178                j->cur_entry_sectors);
1179
1180         switch (s.cur_entry_offset) {
1181         case JOURNAL_ENTRY_ERROR_VAL:
1182                 pr_buf(out, "error\n");
1183                 break;
1184         case JOURNAL_ENTRY_CLOSED_VAL:
1185                 pr_buf(out, "closed\n");
1186                 break;
1187         default:
1188                 pr_buf(out, "%u/%u\n",
1189                        s.cur_entry_offset,
1190                        j->cur_entry_u64s);
1191                 break;
1192         }
1193
1194         pr_buf(out,
1195                "current entry refs:\t%u\n"
1196                "prev entry unwritten:\t",
1197                journal_state_count(s, s.idx));
1198
1199         if (s.prev_buf_unwritten)
1200                 pr_buf(out, "yes, ref %u sectors %u\n",
1201                        journal_state_count(s, !s.idx),
1202                        journal_prev_buf(j)->sectors);
1203         else
1204                 pr_buf(out, "no\n");
1205
1206         pr_buf(out,
1207                "need write:\t\t%i\n"
1208                "replay done:\t\t%i\n",
1209                test_bit(JOURNAL_NEED_WRITE,     &j->flags),
1210                test_bit(JOURNAL_REPLAY_DONE,    &j->flags));
1211
1212         for_each_member_device_rcu(ca, c, iter,
1213                                    &c->rw_devs[BCH_DATA_journal]) {
1214                 struct journal_device *ja = &ca->journal;
1215
1216                 if (!ja->nr)
1217                         continue;
1218
1219                 pr_buf(out,
1220                        "dev %u:\n"
1221                        "\tnr\t\t%u\n"
1222                        "\tavailable\t%u:%u\n"
1223                        "\tdiscard_idx\t\t%u\n"
1224                        "\tdirty_idx_ondisk\t%u (seq %llu)\n"
1225                        "\tdirty_idx\t\t%u (seq %llu)\n"
1226                        "\tcur_idx\t\t%u (seq %llu)\n",
1227                        iter, ja->nr,
1228                        bch2_journal_dev_buckets_available(j, ja, journal_space_discarded),
1229                        ja->sectors_free,
1230                        ja->discard_idx,
1231                        ja->dirty_idx_ondisk,    ja->bucket_seq[ja->dirty_idx_ondisk],
1232                        ja->dirty_idx,           ja->bucket_seq[ja->dirty_idx],
1233                        ja->cur_idx,             ja->bucket_seq[ja->cur_idx]);
1234         }
1235
1236         spin_unlock(&j->lock);
1237         rcu_read_unlock();
1238 }
1239
1240 void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j)
1241 {
1242         struct journal_entry_pin_list *pin_list;
1243         struct journal_entry_pin *pin;
1244         u64 i;
1245
1246         spin_lock(&j->lock);
1247         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
1248                 pr_buf(out, "%llu: count %u\n",
1249                        i, atomic_read(&pin_list->count));
1250
1251                 list_for_each_entry(pin, &pin_list->list, list)
1252                         pr_buf(out, "\t%px %ps\n",
1253                                pin, pin->flush);
1254
1255                 if (!list_empty(&pin_list->flushed))
1256                         pr_buf(out, "flushed:\n");
1257
1258                 list_for_each_entry(pin, &pin_list->flushed, list)
1259                         pr_buf(out, "\t%px %ps\n",
1260                                pin, pin->flush);
1261         }
1262         spin_unlock(&j->lock);
1263 }