]> git.sesse.net Git - bcachefs-tools-debian/blob - libbcachefs/journal.c
Update bcachefs sources to d7f6da1d60 bcachefs: fix missing include
[bcachefs-tools-debian] / libbcachefs / journal.c
1 /*
2  * bcachefs journalling code, for btree insertions
3  *
4  * Copyright 2012 Google, Inc.
5  */
6
7 #include "bcachefs.h"
8 #include "alloc_foreground.h"
9 #include "bkey_methods.h"
10 #include "btree_gc.h"
11 #include "buckets.h"
12 #include "journal.h"
13 #include "journal_io.h"
14 #include "journal_reclaim.h"
15 #include "journal_seq_blacklist.h"
16 #include "super-io.h"
17
18 #include <trace/events/bcachefs.h>
19
20 static bool journal_entry_is_open(struct journal *j)
21 {
22         return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
23 }
24
25 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
26 {
27         struct journal_buf *w = journal_prev_buf(j);
28
29         atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
30
31         if (!need_write_just_set &&
32             test_bit(JOURNAL_NEED_WRITE, &j->flags))
33                 bch2_time_stats_update(j->delay_time,
34                                        j->need_write_time);
35
36         closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
37 }
38
39 static void journal_pin_new_entry(struct journal *j, int count)
40 {
41         struct journal_entry_pin_list *p;
42
43         /*
44          * The fifo_push() needs to happen at the same time as j->seq is
45          * incremented for journal_last_seq() to be calculated correctly
46          */
47         atomic64_inc(&j->seq);
48         p = fifo_push_ref(&j->pin);
49
50         INIT_LIST_HEAD(&p->list);
51         INIT_LIST_HEAD(&p->flushed);
52         atomic_set(&p->count, count);
53         p->devs.nr = 0;
54 }
55
56 static void bch2_journal_buf_init(struct journal *j)
57 {
58         struct journal_buf *buf = journal_cur_buf(j);
59
60         memset(buf->has_inode, 0, sizeof(buf->has_inode));
61
62         memset(buf->data, 0, sizeof(*buf->data));
63         buf->data->seq  = cpu_to_le64(journal_cur_seq(j));
64         buf->data->u64s = 0;
65 }
66
67 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
68 {
69         return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
70 }
71
72 static inline bool journal_entry_empty(struct jset *j)
73 {
74         struct jset_entry *i;
75
76         if (j->seq != j->last_seq)
77                 return false;
78
79         vstruct_for_each(j, i)
80                 if (i->type || i->u64s)
81                         return false;
82         return true;
83 }
84
85 static enum {
86         JOURNAL_ENTRY_ERROR,
87         JOURNAL_ENTRY_INUSE,
88         JOURNAL_ENTRY_CLOSED,
89         JOURNAL_UNLOCKED,
90 } journal_buf_switch(struct journal *j, bool need_write_just_set)
91 {
92         struct bch_fs *c = container_of(j, struct bch_fs, journal);
93         struct journal_buf *buf = journal_cur_buf(j);
94         union journal_res_state old, new;
95         u64 v = atomic64_read(&j->reservations.counter);
96
97         lockdep_assert_held(&j->lock);
98
99         do {
100                 old.v = new.v = v;
101                 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
102                         return JOURNAL_ENTRY_CLOSED;
103
104                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
105                         /* this entry will never be written: */
106                         closure_wake_up(&buf->wait);
107                         return JOURNAL_ENTRY_ERROR;
108                 }
109
110                 if (new.prev_buf_unwritten)
111                         return JOURNAL_ENTRY_INUSE;
112
113                 /*
114                  * avoid race between setting buf->data->u64s and
115                  * journal_res_put starting write:
116                  */
117                 journal_state_inc(&new);
118
119                 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
120                 new.idx++;
121                 new.prev_buf_unwritten = 1;
122
123                 BUG_ON(journal_state_count(new, new.idx));
124         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
125                                        old.v, new.v)) != old.v);
126
127         clear_bit(JOURNAL_NEED_WRITE, &j->flags);
128
129         buf->data->u64s         = cpu_to_le32(old.cur_entry_offset);
130
131         j->prev_buf_sectors =
132                 vstruct_blocks_plus(buf->data, c->block_bits,
133                                     journal_entry_u64s_reserve(buf)) *
134                 c->opts.block_size;
135         BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
136
137         /*
138          * We have to set last_seq here, _before_ opening a new journal entry:
139          *
140          * A threads may replace an old pin with a new pin on their current
141          * journal reservation - the expectation being that the journal will
142          * contain either what the old pin protected or what the new pin
143          * protects.
144          *
145          * After the old pin is dropped journal_last_seq() won't include the old
146          * pin, so we can only write the updated last_seq on the entry that
147          * contains whatever the new pin protects.
148          *
149          * Restated, we can _not_ update last_seq for a given entry if there
150          * could be a newer entry open with reservations/pins that have been
151          * taken against it.
152          *
153          * Hence, we want update/set last_seq on the current journal entry right
154          * before we open a new one:
155          */
156         bch2_journal_reclaim_fast(j);
157         buf->data->last_seq     = cpu_to_le64(journal_last_seq(j));
158
159         if (journal_entry_empty(buf->data))
160                 clear_bit(JOURNAL_NOT_EMPTY, &j->flags);
161         else
162                 set_bit(JOURNAL_NOT_EMPTY, &j->flags);
163
164         journal_pin_new_entry(j, 1);
165
166         bch2_journal_buf_init(j);
167
168         cancel_delayed_work(&j->write_work);
169         spin_unlock(&j->lock);
170
171         /* ugh - might be called from __journal_res_get() under wait_event() */
172         __set_current_state(TASK_RUNNING);
173         bch2_journal_buf_put(j, old.idx, need_write_just_set);
174
175         return JOURNAL_UNLOCKED;
176 }
177
178 void bch2_journal_halt(struct journal *j)
179 {
180         union journal_res_state old, new;
181         u64 v = atomic64_read(&j->reservations.counter);
182
183         do {
184                 old.v = new.v = v;
185                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
186                         return;
187
188                 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
189         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
190                                        old.v, new.v)) != old.v);
191
192         journal_wake(j);
193         closure_wake_up(&journal_cur_buf(j)->wait);
194         closure_wake_up(&journal_prev_buf(j)->wait);
195 }
196
197 /*
198  * should _only_ called from journal_res_get() - when we actually want a
199  * journal reservation - journal entry is open means journal is dirty:
200  *
201  * returns:
202  * 1:           success
203  * 0:           journal currently full (must wait)
204  * -EROFS:      insufficient rw devices
205  * -EIO:        journal error
206  */
207 static int journal_entry_open(struct journal *j)
208 {
209         struct journal_buf *buf = journal_cur_buf(j);
210         union journal_res_state old, new;
211         ssize_t u64s;
212         int sectors;
213         u64 v;
214
215         lockdep_assert_held(&j->lock);
216         BUG_ON(journal_entry_is_open(j));
217
218         if (!fifo_free(&j->pin))
219                 return 0;
220
221         sectors = bch2_journal_entry_sectors(j);
222         if (sectors <= 0)
223                 return sectors;
224
225         buf->disk_sectors       = sectors;
226
227         sectors = min_t(unsigned, sectors, buf->size >> 9);
228         j->cur_buf_sectors      = sectors;
229
230         u64s = (sectors << 9) / sizeof(u64);
231
232         /* Subtract the journal header */
233         u64s -= sizeof(struct jset) / sizeof(u64);
234         /*
235          * Btree roots, prio pointers don't get added until right before we do
236          * the write:
237          */
238         u64s -= journal_entry_u64s_reserve(buf);
239         u64s  = max_t(ssize_t, 0L, u64s);
240
241         BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
242
243         if (u64s <= le32_to_cpu(buf->data->u64s))
244                 return 0;
245
246         /*
247          * Must be set before marking the journal entry as open:
248          */
249         j->cur_entry_u64s = u64s;
250
251         v = atomic64_read(&j->reservations.counter);
252         do {
253                 old.v = new.v = v;
254
255                 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
256                         return -EIO;
257
258                 /* Handle any already added entries */
259                 new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
260         } while ((v = atomic64_cmpxchg(&j->reservations.counter,
261                                        old.v, new.v)) != old.v);
262
263         if (j->res_get_blocked_start)
264                 bch2_time_stats_update(j->blocked_time,
265                                        j->res_get_blocked_start);
266         j->res_get_blocked_start = 0;
267
268         mod_delayed_work(system_freezable_wq,
269                          &j->write_work,
270                          msecs_to_jiffies(j->write_delay_ms));
271         journal_wake(j);
272         return 1;
273 }
274
275 static bool __journal_entry_close(struct journal *j)
276 {
277         bool set_need_write;
278
279         if (!journal_entry_is_open(j)) {
280                 spin_unlock(&j->lock);
281                 return true;
282         }
283
284         set_need_write = !test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags);
285         if (set_need_write)
286                 j->need_write_time = local_clock();
287
288         switch (journal_buf_switch(j, set_need_write)) {
289         case JOURNAL_ENTRY_INUSE:
290                 spin_unlock(&j->lock);
291                 return false;
292         default:
293                 spin_unlock(&j->lock);
294         case JOURNAL_UNLOCKED:
295                 return true;
296         }
297 }
298
299 static bool journal_entry_close(struct journal *j)
300 {
301         spin_lock(&j->lock);
302         return __journal_entry_close(j);
303 }
304
305 static void journal_write_work(struct work_struct *work)
306 {
307         struct journal *j = container_of(work, struct journal, write_work.work);
308
309         journal_entry_close(j);
310 }
311
312 /*
313  * Given an inode number, if that inode number has data in the journal that
314  * hasn't yet been flushed, return the journal sequence number that needs to be
315  * flushed:
316  */
317 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
318 {
319         size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
320         u64 seq = 0;
321
322         if (!test_bit(h, j->buf[0].has_inode) &&
323             !test_bit(h, j->buf[1].has_inode))
324                 return 0;
325
326         spin_lock(&j->lock);
327         if (test_bit(h, journal_cur_buf(j)->has_inode))
328                 seq = journal_cur_seq(j);
329         else if (test_bit(h, journal_prev_buf(j)->has_inode))
330                 seq = journal_cur_seq(j) - 1;
331         spin_unlock(&j->lock);
332
333         return seq;
334 }
335
336 static int __journal_res_get(struct journal *j, struct journal_res *res,
337                               unsigned u64s_min, unsigned u64s_max)
338 {
339         struct bch_fs *c = container_of(j, struct bch_fs, journal);
340         struct journal_buf *buf;
341         int ret;
342 retry:
343         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
344         if (ret)
345                 return ret;
346
347         spin_lock(&j->lock);
348         /*
349          * Recheck after taking the lock, so we don't race with another thread
350          * that just did journal_entry_open() and call journal_entry_close()
351          * unnecessarily
352          */
353         ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
354         if (ret) {
355                 spin_unlock(&j->lock);
356                 return 1;
357         }
358
359         /*
360          * If we couldn't get a reservation because the current buf filled up,
361          * and we had room for a bigger entry on disk, signal that we want to
362          * realloc the journal bufs:
363          */
364         buf = journal_cur_buf(j);
365         if (journal_entry_is_open(j) &&
366             buf->size >> 9 < buf->disk_sectors &&
367             buf->size < JOURNAL_ENTRY_SIZE_MAX)
368                 j->buf_size_want = max(j->buf_size_want, buf->size << 1);
369
370         /*
371          * Close the current journal entry if necessary, then try to start a new
372          * one:
373          */
374         switch (journal_buf_switch(j, false)) {
375         case JOURNAL_ENTRY_ERROR:
376                 spin_unlock(&j->lock);
377                 return -EROFS;
378         case JOURNAL_ENTRY_INUSE:
379                 /* haven't finished writing out the previous one: */
380                 spin_unlock(&j->lock);
381                 trace_journal_entry_full(c);
382                 goto blocked;
383         case JOURNAL_ENTRY_CLOSED:
384                 break;
385         case JOURNAL_UNLOCKED:
386                 goto retry;
387         }
388
389         /* We now have a new, closed journal buf - see if we can open it: */
390         ret = journal_entry_open(j);
391         spin_unlock(&j->lock);
392
393         if (ret < 0)
394                 return ret;
395         if (ret)
396                 goto retry;
397
398         /* Journal's full, we have to wait */
399
400         /*
401          * Direct reclaim - can't rely on reclaim from work item
402          * due to freezing..
403          */
404         bch2_journal_reclaim_work(&j->reclaim_work.work);
405
406         trace_journal_full(c);
407 blocked:
408         if (!j->res_get_blocked_start)
409                 j->res_get_blocked_start = local_clock() ?: 1;
410         return 0;
411 }
412
413 /*
414  * Essentially the entry function to the journaling code. When bcachefs is doing
415  * a btree insert, it calls this function to get the current journal write.
416  * Journal write is the structure used set up journal writes. The calling
417  * function will then add its keys to the structure, queuing them for the next
418  * write.
419  *
420  * To ensure forward progress, the current task must not be holding any
421  * btree node write locks.
422  */
423 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
424                                  unsigned u64s_min, unsigned u64s_max)
425 {
426         int ret;
427
428         wait_event(j->wait,
429                    (ret = __journal_res_get(j, res, u64s_min,
430                                             u64s_max)));
431         return ret < 0 ? ret : 0;
432 }
433
434 u64 bch2_journal_last_unwritten_seq(struct journal *j)
435 {
436         u64 seq;
437
438         spin_lock(&j->lock);
439         seq = journal_cur_seq(j);
440         if (j->reservations.prev_buf_unwritten)
441                 seq--;
442         spin_unlock(&j->lock);
443
444         return seq;
445 }
446
447 /**
448  * bch2_journal_open_seq_async - try to open a new journal entry if @seq isn't
449  * open yet, or wait if we cannot
450  *
451  * used by the btree interior update machinery, when it needs to write a new
452  * btree root - every journal entry contains the roots of all the btrees, so it
453  * doesn't need to bother with getting a journal reservation
454  */
455 int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
456 {
457         int ret;
458
459         spin_lock(&j->lock);
460         BUG_ON(seq > journal_cur_seq(j));
461
462         if (seq < journal_cur_seq(j) ||
463             journal_entry_is_open(j)) {
464                 spin_unlock(&j->lock);
465                 return 1;
466         }
467
468         ret = journal_entry_open(j);
469         if (!ret)
470                 closure_wait(&j->async_wait, parent);
471         spin_unlock(&j->lock);
472
473         if (!ret)
474                 bch2_journal_reclaim_work(&j->reclaim_work.work);
475
476         return ret;
477 }
478
479 static int journal_seq_error(struct journal *j, u64 seq)
480 {
481         union journal_res_state state = READ_ONCE(j->reservations);
482
483         if (seq == journal_cur_seq(j))
484                 return bch2_journal_error(j);
485
486         if (seq + 1 == journal_cur_seq(j) &&
487             !state.prev_buf_unwritten &&
488             seq > j->seq_ondisk)
489                 return -EIO;
490
491         return 0;
492 }
493
494 static inline struct journal_buf *
495 journal_seq_to_buf(struct journal *j, u64 seq)
496 {
497         /* seq should be for a journal entry that has been opened: */
498         BUG_ON(seq > journal_cur_seq(j));
499         BUG_ON(seq == journal_cur_seq(j) &&
500                j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
501
502         if (seq == journal_cur_seq(j))
503                 return journal_cur_buf(j);
504         if (seq + 1 == journal_cur_seq(j) &&
505             j->reservations.prev_buf_unwritten)
506                 return journal_prev_buf(j);
507         return NULL;
508 }
509
510 /**
511  * bch2_journal_wait_on_seq - wait for a journal entry to be written
512  *
513  * does _not_ cause @seq to be written immediately - if there is no other
514  * activity to cause the relevant journal entry to be filled up or flushed it
515  * can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is
516  * configurable).
517  */
518 void bch2_journal_wait_on_seq(struct journal *j, u64 seq,
519                               struct closure *parent)
520 {
521         struct journal_buf *buf;
522
523         spin_lock(&j->lock);
524
525         if ((buf = journal_seq_to_buf(j, seq))) {
526                 if (!closure_wait(&buf->wait, parent))
527                         BUG();
528
529                 if (seq == journal_cur_seq(j)) {
530                         smp_mb();
531                         if (bch2_journal_error(j))
532                                 closure_wake_up(&buf->wait);
533                 }
534         }
535
536         spin_unlock(&j->lock);
537 }
538
539 /**
540  * bch2_journal_flush_seq_async - wait for a journal entry to be written
541  *
542  * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
543  * necessary
544  */
545 void bch2_journal_flush_seq_async(struct journal *j, u64 seq,
546                                   struct closure *parent)
547 {
548         struct journal_buf *buf;
549
550         spin_lock(&j->lock);
551
552         if (parent &&
553             (buf = journal_seq_to_buf(j, seq)))
554                 if (!closure_wait(&buf->wait, parent))
555                         BUG();
556
557         if (seq == journal_cur_seq(j))
558                 __journal_entry_close(j);
559         else
560                 spin_unlock(&j->lock);
561 }
562
563 static int journal_seq_flushed(struct journal *j, u64 seq)
564 {
565         int ret;
566
567         spin_lock(&j->lock);
568         ret = seq <= j->seq_ondisk ? 1 : journal_seq_error(j, seq);
569
570         if (seq == journal_cur_seq(j))
571                 __journal_entry_close(j);
572         else
573                 spin_unlock(&j->lock);
574
575         return ret;
576 }
577
578 int bch2_journal_flush_seq(struct journal *j, u64 seq)
579 {
580         u64 start_time = local_clock();
581         int ret, ret2;
582
583         ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
584
585         bch2_time_stats_update(j->flush_seq_time, start_time);
586
587         return ret ?: ret2 < 0 ? ret2 : 0;
588 }
589
590 /**
591  * bch2_journal_meta_async - force a journal entry to be written
592  */
593 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
594 {
595         struct journal_res res;
596         unsigned u64s = jset_u64s(0);
597
598         memset(&res, 0, sizeof(res));
599
600         bch2_journal_res_get(j, &res, u64s, u64s);
601         bch2_journal_res_put(j, &res);
602
603         bch2_journal_flush_seq_async(j, res.seq, parent);
604 }
605
606 int bch2_journal_meta(struct journal *j)
607 {
608         struct journal_res res;
609         unsigned u64s = jset_u64s(0);
610         int ret;
611
612         memset(&res, 0, sizeof(res));
613
614         ret = bch2_journal_res_get(j, &res, u64s, u64s);
615         if (ret)
616                 return ret;
617
618         bch2_journal_res_put(j, &res);
619
620         return bch2_journal_flush_seq(j, res.seq);
621 }
622
623 /*
624  * bch2_journal_flush_async - if there is an open journal entry, or a journal
625  * still being written, write it and wait for the write to complete
626  */
627 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
628 {
629         u64 seq, journal_seq;
630
631         spin_lock(&j->lock);
632         journal_seq = journal_cur_seq(j);
633
634         if (journal_entry_is_open(j)) {
635                 seq = journal_seq;
636         } else if (journal_seq) {
637                 seq = journal_seq - 1;
638         } else {
639                 spin_unlock(&j->lock);
640                 return;
641         }
642         spin_unlock(&j->lock);
643
644         bch2_journal_flush_seq_async(j, seq, parent);
645 }
646
647 int bch2_journal_flush(struct journal *j)
648 {
649         u64 seq, journal_seq;
650
651         spin_lock(&j->lock);
652         journal_seq = journal_cur_seq(j);
653
654         if (journal_entry_is_open(j)) {
655                 seq = journal_seq;
656         } else if (journal_seq) {
657                 seq = journal_seq - 1;
658         } else {
659                 spin_unlock(&j->lock);
660                 return 0;
661         }
662         spin_unlock(&j->lock);
663
664         return bch2_journal_flush_seq(j, seq);
665 }
666
667 /* allocate journal on a device: */
668
669 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
670                                          bool new_fs, struct closure *cl)
671 {
672         struct bch_fs *c = ca->fs;
673         struct journal_device *ja = &ca->journal;
674         struct bch_sb_field_journal *journal_buckets;
675         u64 *new_bucket_seq = NULL, *new_buckets = NULL;
676         int ret = 0;
677
678         /* don't handle reducing nr of buckets yet: */
679         if (nr <= ja->nr)
680                 return 0;
681
682         ret = -ENOMEM;
683         new_buckets     = kzalloc(nr * sizeof(u64), GFP_KERNEL);
684         new_bucket_seq  = kzalloc(nr * sizeof(u64), GFP_KERNEL);
685         if (!new_buckets || !new_bucket_seq)
686                 goto err;
687
688         journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
689                                 nr + sizeof(*journal_buckets) / sizeof(u64));
690         if (!journal_buckets)
691                 goto err;
692
693         /*
694          * We may be called from the device add path, before the new device has
695          * actually been added to the running filesystem:
696          */
697         if (c)
698                 spin_lock(&c->journal.lock);
699
700         memcpy(new_buckets,     ja->buckets,    ja->nr * sizeof(u64));
701         memcpy(new_bucket_seq,  ja->bucket_seq, ja->nr * sizeof(u64));
702         swap(new_buckets,       ja->buckets);
703         swap(new_bucket_seq,    ja->bucket_seq);
704
705         if (c)
706                 spin_unlock(&c->journal.lock);
707
708         while (ja->nr < nr) {
709                 struct open_bucket *ob = NULL;
710                 long bucket;
711
712                 if (new_fs) {
713                         bucket = bch2_bucket_alloc_new_fs(ca);
714                         if (bucket < 0) {
715                                 ret = -ENOSPC;
716                                 goto err;
717                         }
718                 } else {
719                         ob = bch2_bucket_alloc(c, ca, RESERVE_ALLOC,
720                                                false, cl);
721                         if (IS_ERR(ob)) {
722                                 ret = cl ? -EAGAIN : -ENOSPC;
723                                 goto err;
724                         }
725
726                         bucket = sector_to_bucket(ca, ob->ptr.offset);
727                 }
728
729                 if (c) {
730                         percpu_down_read_preempt_disable(&c->usage_lock);
731                         spin_lock(&c->journal.lock);
732                 } else {
733                         preempt_disable();
734                 }
735
736                 __array_insert_item(ja->buckets,                ja->nr, ja->last_idx);
737                 __array_insert_item(ja->bucket_seq,             ja->nr, ja->last_idx);
738                 __array_insert_item(journal_buckets->buckets,   ja->nr, ja->last_idx);
739
740                 ja->buckets[ja->last_idx] = bucket;
741                 ja->bucket_seq[ja->last_idx] = 0;
742                 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket);
743
744                 if (ja->last_idx < ja->nr) {
745                         if (ja->cur_idx >= ja->last_idx)
746                                 ja->cur_idx++;
747                         ja->last_idx++;
748                 }
749                 ja->nr++;
750
751                 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
752                                 ca->mi.bucket_size,
753                                 gc_phase(GC_PHASE_SB),
754                                 new_fs
755                                 ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE
756                                 : 0);
757
758                 if (c) {
759                         spin_unlock(&c->journal.lock);
760                         percpu_up_read_preempt_enable(&c->usage_lock);
761                 } else {
762                         preempt_enable();
763                 }
764
765                 if (!new_fs)
766                         bch2_open_bucket_put(c, ob);
767         }
768
769         ret = 0;
770 err:
771         kfree(new_bucket_seq);
772         kfree(new_buckets);
773
774         return ret;
775 }
776
777 /*
778  * Allocate more journal space at runtime - not currently making use if it, but
779  * the code works:
780  */
781 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
782                                 unsigned nr)
783 {
784         struct journal_device *ja = &ca->journal;
785         struct closure cl;
786         unsigned current_nr;
787         int ret;
788
789         closure_init_stack(&cl);
790
791         do {
792                 struct disk_reservation disk_res = { 0, 0 };
793
794                 closure_sync(&cl);
795
796                 mutex_lock(&c->sb_lock);
797                 current_nr = ja->nr;
798
799                 /*
800                  * note: journal buckets aren't really counted as _sectors_ used yet, so
801                  * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
802                  * when space used goes up without a reservation - but we do need the
803                  * reservation to ensure we'll actually be able to allocate:
804                  */
805
806                 if (bch2_disk_reservation_get(c, &disk_res,
807                                 bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
808                         mutex_unlock(&c->sb_lock);
809                         return -ENOSPC;
810                 }
811
812                 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
813
814                 bch2_disk_reservation_put(c, &disk_res);
815
816                 if (ja->nr != current_nr)
817                         bch2_write_super(c);
818                 mutex_unlock(&c->sb_lock);
819         } while (ret == -EAGAIN);
820
821         return ret;
822 }
823
824 int bch2_dev_journal_alloc(struct bch_dev *ca)
825 {
826         unsigned nr;
827
828         if (dynamic_fault("bcachefs:add:journal_alloc"))
829                 return -ENOMEM;
830
831         /*
832          * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
833          * is smaller:
834          */
835         nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
836                      BCH_JOURNAL_BUCKETS_MIN,
837                      min(1 << 10,
838                          (1 << 20) / ca->mi.bucket_size));
839
840         return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
841 }
842
843 /* startup/shutdown: */
844
845 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
846 {
847         union journal_res_state state;
848         struct journal_buf *w;
849         bool ret;
850
851         spin_lock(&j->lock);
852         state = READ_ONCE(j->reservations);
853         w = j->buf + !state.idx;
854
855         ret = state.prev_buf_unwritten &&
856                 bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx);
857         spin_unlock(&j->lock);
858
859         return ret;
860 }
861
862 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
863 {
864         spin_lock(&j->lock);
865         bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
866         spin_unlock(&j->lock);
867
868         wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
869 }
870
871 void bch2_fs_journal_stop(struct journal *j)
872 {
873         struct bch_fs *c = container_of(j, struct bch_fs, journal);
874
875         wait_event(j->wait, journal_entry_close(j));
876
877         /* do we need to write another journal entry? */
878         if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) ||
879             c->btree_roots_dirty)
880                 bch2_journal_meta(j);
881
882         BUG_ON(journal_entry_is_open(j) ||
883                j->reservations.prev_buf_unwritten);
884
885         BUG_ON(!bch2_journal_error(j) &&
886                test_bit(JOURNAL_NOT_EMPTY, &j->flags));
887
888         cancel_delayed_work_sync(&j->write_work);
889         cancel_delayed_work_sync(&j->reclaim_work);
890 }
891
892 void bch2_fs_journal_start(struct journal *j)
893 {
894         struct bch_fs *c = container_of(j, struct bch_fs, journal);
895         struct journal_seq_blacklist *bl;
896         u64 blacklist = 0;
897
898         list_for_each_entry(bl, &j->seq_blacklist, list)
899                 blacklist = max(blacklist, bl->end);
900
901         spin_lock(&j->lock);
902
903         set_bit(JOURNAL_STARTED, &j->flags);
904
905         while (journal_cur_seq(j) < blacklist)
906                 journal_pin_new_entry(j, 0);
907
908         /*
909          * journal_buf_switch() only inits the next journal entry when it
910          * closes an open journal entry - the very first journal entry gets
911          * initialized here:
912          */
913         journal_pin_new_entry(j, 1);
914         bch2_journal_buf_init(j);
915
916         c->last_bucket_seq_cleanup = journal_cur_seq(j);
917
918         spin_unlock(&j->lock);
919
920         /*
921          * Adding entries to the next journal entry before allocating space on
922          * disk for the next journal entry - this is ok, because these entries
923          * only have to go down with the next journal entry we write:
924          */
925         bch2_journal_seq_blacklist_write(j);
926
927         queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
928 }
929
930 /* init/exit: */
931
932 void bch2_dev_journal_exit(struct bch_dev *ca)
933 {
934         kfree(ca->journal.bio);
935         kfree(ca->journal.buckets);
936         kfree(ca->journal.bucket_seq);
937
938         ca->journal.bio         = NULL;
939         ca->journal.buckets     = NULL;
940         ca->journal.bucket_seq  = NULL;
941 }
942
943 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
944 {
945         struct journal_device *ja = &ca->journal;
946         struct bch_sb_field_journal *journal_buckets =
947                 bch2_sb_get_journal(sb);
948         unsigned i;
949
950         ja->nr = bch2_nr_journal_buckets(journal_buckets);
951
952         ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
953         if (!ja->bucket_seq)
954                 return -ENOMEM;
955
956         ca->journal.bio = bio_kmalloc(GFP_KERNEL,
957                         DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE));
958         if (!ca->journal.bio)
959                 return -ENOMEM;
960
961         ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
962         if (!ja->buckets)
963                 return -ENOMEM;
964
965         for (i = 0; i < ja->nr; i++)
966                 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
967
968         return 0;
969 }
970
971 void bch2_fs_journal_exit(struct journal *j)
972 {
973         kvpfree(j->buf[1].data, j->buf[1].size);
974         kvpfree(j->buf[0].data, j->buf[0].size);
975         free_fifo(&j->pin);
976 }
977
978 int bch2_fs_journal_init(struct journal *j)
979 {
980         struct bch_fs *c = container_of(j, struct bch_fs, journal);
981         static struct lock_class_key res_key;
982         int ret = 0;
983
984         pr_verbose_init(c->opts, "");
985
986         spin_lock_init(&j->lock);
987         spin_lock_init(&j->err_lock);
988         init_waitqueue_head(&j->wait);
989         INIT_DELAYED_WORK(&j->write_work, journal_write_work);
990         INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work);
991         init_waitqueue_head(&j->pin_flush_wait);
992         mutex_init(&j->blacklist_lock);
993         INIT_LIST_HEAD(&j->seq_blacklist);
994         mutex_init(&j->reclaim_lock);
995
996         lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
997
998         j->buf[0].size          = JOURNAL_ENTRY_SIZE_MIN;
999         j->buf[1].size          = JOURNAL_ENTRY_SIZE_MIN;
1000         j->write_delay_ms       = 1000;
1001         j->reclaim_delay_ms     = 100;
1002
1003         bkey_extent_init(&j->key);
1004
1005         atomic64_set(&j->reservations.counter,
1006                 ((union journal_res_state)
1007                  { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
1008
1009         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
1010             !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
1011             !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) {
1012                 ret = -ENOMEM;
1013                 goto out;
1014         }
1015
1016         j->pin.front = j->pin.back = 1;
1017 out:
1018         pr_verbose_init(c->opts, "ret %i", ret);
1019         return ret;
1020 }
1021
1022 /* debug: */
1023
1024 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
1025 {
1026         struct bch_fs *c = container_of(j, struct bch_fs, journal);
1027         union journal_res_state *s = &j->reservations;
1028         struct bch_dev *ca;
1029         unsigned iter;
1030         ssize_t ret = 0;
1031
1032         rcu_read_lock();
1033         spin_lock(&j->lock);
1034
1035         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1036                          "active journal entries:\t%llu\n"
1037                          "seq:\t\t\t%llu\n"
1038                          "last_seq:\t\t%llu\n"
1039                          "last_seq_ondisk:\t%llu\n"
1040                          "reservation count:\t%u\n"
1041                          "reservation offset:\t%u\n"
1042                          "current entry u64s:\t%u\n"
1043                          "io in flight:\t\t%i\n"
1044                          "need write:\t\t%i\n"
1045                          "dirty:\t\t\t%i\n"
1046                          "replay done:\t\t%i\n",
1047                          fifo_used(&j->pin),
1048                          journal_cur_seq(j),
1049                          journal_last_seq(j),
1050                          j->last_seq_ondisk,
1051                          journal_state_count(*s, s->idx),
1052                          s->cur_entry_offset,
1053                          j->cur_entry_u64s,
1054                          s->prev_buf_unwritten,
1055                          test_bit(JOURNAL_NEED_WRITE,   &j->flags),
1056                          journal_entry_is_open(j),
1057                          test_bit(JOURNAL_REPLAY_DONE,  &j->flags));
1058
1059         for_each_member_device_rcu(ca, c, iter,
1060                                    &c->rw_devs[BCH_DATA_JOURNAL]) {
1061                 struct journal_device *ja = &ca->journal;
1062
1063                 if (!ja->nr)
1064                         continue;
1065
1066                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1067                                  "dev %u:\n"
1068                                  "\tnr\t\t%u\n"
1069                                  "\tcur_idx\t\t%u (seq %llu)\n"
1070                                  "\tlast_idx\t%u (seq %llu)\n",
1071                                  iter, ja->nr,
1072                                  ja->cur_idx,   ja->bucket_seq[ja->cur_idx],
1073                                  ja->last_idx,  ja->bucket_seq[ja->last_idx]);
1074         }
1075
1076         spin_unlock(&j->lock);
1077         rcu_read_unlock();
1078
1079         return ret;
1080 }
1081
1082 ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
1083 {
1084         struct journal_entry_pin_list *pin_list;
1085         struct journal_entry_pin *pin;
1086         ssize_t ret = 0;
1087         u64 i;
1088
1089         spin_lock(&j->lock);
1090         fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
1091                 ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1092                                  "%llu: count %u\n",
1093                                  i, atomic_read(&pin_list->count));
1094
1095                 list_for_each_entry(pin, &pin_list->list, list)
1096                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1097                                          "\t%p %pf\n",
1098                                          pin, pin->flush);
1099
1100                 if (!list_empty(&pin_list->flushed))
1101                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1102                                          "flushed:\n");
1103
1104                 list_for_each_entry(pin, &pin_list->flushed, list)
1105                         ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1106                                          "\t%p %pf\n",
1107                                          pin, pin->flush);
1108         }
1109         spin_unlock(&j->lock);
1110
1111         return ret;
1112 }