1 // SPDX-License-Identifier: GPL-2.0
4 #include "btree_locking.h"
5 #include "btree_update.h"
6 #include "btree_update_interior.h"
7 #include "btree_write_buffer.h"
10 #include "journal_reclaim.h"
12 #include <linux/sort.h>
14 static int bch2_btree_write_buffer_journal_flush(struct journal *,
15 struct journal_entry_pin *, u64);
17 static int btree_write_buffered_key_cmp(const void *_l, const void *_r)
19 const struct btree_write_buffered_key *l = _l;
20 const struct btree_write_buffered_key *r = _r;
22 return cmp_int(l->btree, r->btree) ?:
23 bpos_cmp(l->k.k.p, r->k.k.p) ?:
24 cmp_int(l->journal_seq, r->journal_seq) ?:
25 cmp_int(l->journal_offset, r->journal_offset);
28 static int btree_write_buffered_journal_cmp(const void *_l, const void *_r)
30 const struct btree_write_buffered_key *l = _l;
31 const struct btree_write_buffered_key *r = _r;
33 return cmp_int(l->journal_seq, r->journal_seq);
36 static noinline int wb_flush_one_slowpath(struct btree_trans *trans,
37 struct btree_iter *iter,
38 struct btree_write_buffered_key *wb)
40 bch2_btree_node_unlock_write(trans, iter->path, iter->path->l[0].b);
42 trans->journal_res.seq = wb->journal_seq;
44 return bch2_trans_update(trans, iter, &wb->k,
45 BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE) ?:
46 bch2_trans_commit(trans, NULL, NULL,
47 BCH_TRANS_COMMIT_no_enospc|
48 BCH_TRANS_COMMIT_no_check_rw|
49 BCH_TRANS_COMMIT_no_journal_res|
50 BCH_TRANS_COMMIT_journal_reclaim);
53 static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *iter,
54 struct btree_write_buffered_key *wb,
55 bool *write_locked, size_t *fast)
57 struct bch_fs *c = trans->c;
58 struct btree_path *path;
61 EBUG_ON(!wb->journal_seq);
62 ret = bch2_btree_iter_traverse(iter);
67 * We can't clone a path that has write locks: unshare it now, before
68 * set_pos and traverse():
70 if (iter->path->ref > 1)
71 iter->path = __bch2_btree_path_make_mut(trans, iter->path, true, _THIS_IP_);
76 ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c);
80 bch2_btree_node_prep_for_write(trans, path, path->l[0].b);
84 if (unlikely(!bch2_btree_node_insert_fits(c, path->l[0].b, wb->k.k.u64s))) {
85 *write_locked = false;
86 return wb_flush_one_slowpath(trans, iter, wb);
89 bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq);
94 static union btree_write_buffer_state btree_write_buffer_switch(struct btree_write_buffer *wb)
96 union btree_write_buffer_state old, new;
97 u64 v = READ_ONCE(wb->state.v);
104 } while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
106 while (old.idx == 0 ? wb->state.ref0 : wb->state.ref1)
115 * Update a btree with a write buffered key using the journal seq of the
116 * original write buffer insert.
118 * It is not safe to rejournal the key once it has been inserted into the write
119 * buffer because that may break recovery ordering. For example, the key may
120 * have already been modified in the active write buffer in a seq that comes
121 * before the current transaction. If we were to journal this key again and
122 * crash, recovery would process updates in the wrong order.
125 btree_write_buffered_insert(struct btree_trans *trans,
126 struct btree_write_buffered_key *wb)
128 struct btree_iter iter;
131 bch2_trans_iter_init(trans, &iter, wb->btree, bkey_start_pos(&wb->k.k),
132 BTREE_ITER_CACHED|BTREE_ITER_INTENT);
134 trans->journal_res.seq = wb->journal_seq;
136 ret = bch2_btree_iter_traverse(&iter) ?:
137 bch2_trans_update(trans, &iter, &wb->k,
138 BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE);
139 bch2_trans_iter_exit(trans, &iter);
143 int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
145 struct bch_fs *c = trans->c;
146 struct journal *j = &c->journal;
147 struct btree_write_buffer *wb = &c->btree_write_buffer;
148 struct journal_entry_pin pin;
149 struct btree_write_buffered_key *i, *keys;
150 struct btree_iter iter = { NULL };
151 size_t nr = 0, skipped = 0, fast = 0, slowpath = 0;
152 bool write_locked = false;
153 union btree_write_buffer_state s;
156 memset(&pin, 0, sizeof(pin));
158 bch2_journal_pin_copy(j, &pin, &wb->journal_pin,
159 bch2_btree_write_buffer_journal_flush);
160 bch2_journal_pin_drop(j, &wb->journal_pin);
162 s = btree_write_buffer_switch(wb);
163 keys = wb->keys[s.idx];
167 * We first sort so that we can detect and skip redundant updates, and
168 * then we attempt to flush in sorted btree order, as this is most
171 * However, since we're not flushing in the order they appear in the
172 * journal we won't be able to drop our journal pin until everything is
173 * flushed - which means this could deadlock the journal if we weren't
174 * passing BCH_TRANS_COMMIT_journal_reclaim. This causes the update to fail
175 * if it would block taking a journal reservation.
177 * If that happens, simply skip the key so we can optimistically insert
178 * as many keys as possible in the fast path.
180 sort(keys, nr, sizeof(keys[0]),
181 btree_write_buffered_key_cmp, NULL);
183 for (i = keys; i < keys + nr; i++) {
184 if (i + 1 < keys + nr &&
185 i[0].btree == i[1].btree &&
186 bpos_eq(i[0].k.k.p, i[1].k.k.p)) {
193 (iter.path->btree_id != i->btree ||
194 bpos_gt(i->k.k.p, iter.path->l[0].b->key.k.p))) {
195 bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b);
196 write_locked = false;
199 if (!iter.path || iter.path->btree_id != i->btree) {
200 bch2_trans_iter_exit(trans, &iter);
201 bch2_trans_iter_init(trans, &iter, i->btree, i->k.k.p,
202 BTREE_ITER_INTENT|BTREE_ITER_ALL_SNAPSHOTS);
205 bch2_btree_iter_set_pos(&iter, i->k.k.p);
206 iter.path->preserve = false;
210 ret = -BCH_ERR_journal_reclaim_would_deadlock;
214 ret = wb_flush_one(trans, &iter, i, &write_locked, &fast);
216 bch2_trans_begin(trans);
217 } while (bch2_err_matches(ret, BCH_ERR_transaction_restart));
221 } else if (ret == -BCH_ERR_journal_reclaim_would_deadlock) {
229 bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b);
230 bch2_trans_iter_exit(trans, &iter);
237 * Flush in the order they were present in the journal, so that
238 * we can release journal pins:
239 * The fastpath zapped the seq of keys that were successfully flushed so
240 * we can skip those here.
242 trace_and_count(c, write_buffer_flush_slowpath, trans, slowpath, nr);
244 sort(keys, nr, sizeof(keys[0]),
245 btree_write_buffered_journal_cmp,
248 for (i = keys; i < keys + nr; i++) {
252 bch2_journal_pin_update(j, i->journal_seq, &pin,
253 bch2_btree_write_buffer_journal_flush);
255 ret = commit_do(trans, NULL, NULL,
256 BCH_WATERMARK_reclaim|
257 BCH_TRANS_COMMIT_no_check_rw|
258 BCH_TRANS_COMMIT_no_enospc|
259 BCH_TRANS_COMMIT_no_journal_res|
260 BCH_TRANS_COMMIT_journal_reclaim,
261 btree_write_buffered_insert(trans, i));
267 bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret));
268 trace_write_buffer_flush(trans, nr, skipped, fast, wb->size);
269 bch2_journal_pin_drop(j, &pin);
273 int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans)
275 struct bch_fs *c = trans->c;
277 if (!bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer))
278 return -BCH_ERR_erofs_no_writes;
280 trace_and_count(c, write_buffer_flush_sync, trans, _RET_IP_);
282 bch2_trans_unlock(trans);
283 mutex_lock(&c->btree_write_buffer.flush_lock);
284 int ret = bch2_btree_write_buffer_flush_locked(trans);
285 mutex_unlock(&c->btree_write_buffer.flush_lock);
286 bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer);
290 int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *trans)
292 struct bch_fs *c = trans->c;
293 struct btree_write_buffer *wb = &c->btree_write_buffer;
296 if (mutex_trylock(&wb->flush_lock)) {
297 ret = bch2_btree_write_buffer_flush_locked(trans);
298 mutex_unlock(&wb->flush_lock);
304 int bch2_btree_write_buffer_tryflush(struct btree_trans *trans)
306 struct bch_fs *c = trans->c;
308 if (!bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer))
309 return -BCH_ERR_erofs_no_writes;
311 int ret = bch2_btree_write_buffer_flush_nocheck_rw(trans);
312 bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer);
316 static int bch2_btree_write_buffer_journal_flush(struct journal *j,
317 struct journal_entry_pin *_pin, u64 seq)
319 struct bch_fs *c = container_of(j, struct bch_fs, journal);
320 struct btree_write_buffer *wb = &c->btree_write_buffer;
322 mutex_lock(&wb->flush_lock);
323 int ret = bch2_trans_run(c, bch2_btree_write_buffer_flush_locked(trans));
324 mutex_unlock(&wb->flush_lock);
329 static inline u64 btree_write_buffer_ref(int idx)
331 return ((union btree_write_buffer_state) {
337 int bch2_btree_insert_keys_write_buffer(struct btree_trans *trans)
339 struct bch_fs *c = trans->c;
340 struct btree_write_buffer *wb = &c->btree_write_buffer;
341 struct btree_write_buffered_key *i;
342 union btree_write_buffer_state old, new;
346 trans_for_each_wb_update(trans, i) {
347 EBUG_ON(i->k.k.u64s > BTREE_WRITE_BUFERED_U64s_MAX);
349 i->journal_seq = trans->journal_res.seq;
350 i->journal_offset = trans->journal_res.offset;
354 v = READ_ONCE(wb->state.v);
358 new.v += btree_write_buffer_ref(new.idx);
359 new.nr += trans->nr_wb_updates;
360 if (new.nr > wb->size) {
361 ret = -BCH_ERR_btree_insert_need_flush_buffer;
364 } while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
366 memcpy(wb->keys[new.idx] + old.nr,
368 sizeof(trans->wb_updates[0]) * trans->nr_wb_updates);
370 bch2_journal_pin_add(&c->journal, trans->journal_res.seq, &wb->journal_pin,
371 bch2_btree_write_buffer_journal_flush);
373 atomic64_sub_return_release(btree_write_buffer_ref(new.idx), &wb->state.counter);
379 void bch2_fs_btree_write_buffer_exit(struct bch_fs *c)
381 struct btree_write_buffer *wb = &c->btree_write_buffer;
383 BUG_ON(wb->state.nr && !bch2_journal_error(&c->journal));
389 int bch2_fs_btree_write_buffer_init(struct bch_fs *c)
391 struct btree_write_buffer *wb = &c->btree_write_buffer;
393 mutex_init(&wb->flush_lock);
394 wb->size = c->opts.btree_write_buffer_size;
396 wb->keys[0] = kvmalloc_array(wb->size, sizeof(*wb->keys[0]), GFP_KERNEL);
397 wb->keys[1] = kvmalloc_array(wb->size, sizeof(*wb->keys[1]), GFP_KERNEL);
398 if (!wb->keys[0] || !wb->keys[1])
399 return -BCH_ERR_ENOMEM_fs_btree_write_buffer_init;