1 // SPDX-License-Identifier: GPL-2.0
4 #include "btree_locking.h"
5 #include "btree_update.h"
6 #include "btree_update_interior.h"
7 #include "btree_write_buffer.h"
10 #include "journal_reclaim.h"
12 #include <linux/sort.h>
14 static int btree_write_buffered_key_cmp(const void *_l, const void *_r)
16 const struct btree_write_buffered_key *l = _l;
17 const struct btree_write_buffered_key *r = _r;
19 return cmp_int(l->btree, r->btree) ?:
20 bpos_cmp(l->k.k.p, r->k.k.p) ?:
21 cmp_int(l->journal_seq, r->journal_seq) ?:
22 cmp_int(l->journal_offset, r->journal_offset);
25 static int btree_write_buffered_journal_cmp(const void *_l, const void *_r)
27 const struct btree_write_buffered_key *l = _l;
28 const struct btree_write_buffered_key *r = _r;
30 return cmp_int(l->journal_seq, r->journal_seq);
33 static int bch2_btree_write_buffer_flush_one(struct btree_trans *trans,
34 struct btree_iter *iter,
35 struct btree_write_buffered_key *wb,
36 unsigned commit_flags,
40 struct bch_fs *c = trans->c;
41 struct btree_path *path;
44 ret = bch2_btree_iter_traverse(iter);
51 ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c);
55 bch2_btree_node_prep_for_write(trans, path, path->l[0].b);
59 if (!bch2_btree_node_insert_fits(c, path->l[0].b, wb->k.k.u64s)) {
60 bch2_btree_node_unlock_write(trans, path, path->l[0].b);
61 *write_locked = false;
65 bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq);
70 * We can't clone a path that has write locks: if the path is
71 * shared, unlock before set_pos(), traverse():
73 bch2_btree_node_unlock_write(trans, path, path->l[0].b);
74 *write_locked = false;
78 return bch2_trans_update(trans, iter, &wb->k, 0) ?:
79 bch2_trans_commit(trans, NULL, NULL,
82 BTREE_INSERT_JOURNAL_RECLAIM);
85 static union btree_write_buffer_state btree_write_buffer_switch(struct btree_write_buffer *wb)
87 union btree_write_buffer_state old, new;
88 u64 v = READ_ONCE(wb->state.v);
95 } while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
97 while (old.idx == 0 ? wb->state.ref0 : wb->state.ref1)
105 int __bch2_btree_write_buffer_flush(struct btree_trans *trans, unsigned commit_flags,
108 struct bch_fs *c = trans->c;
109 struct journal *j = &c->journal;
110 struct btree_write_buffer *wb = &c->btree_write_buffer;
111 struct journal_entry_pin pin;
112 struct btree_write_buffered_key *i, *dst, *keys;
113 struct btree_iter iter = { NULL };
114 size_t nr = 0, skipped = 0, fast = 0;
115 bool write_locked = false;
116 union btree_write_buffer_state s;
119 memset(&pin, 0, sizeof(pin));
121 if (!locked && !mutex_trylock(&wb->flush_lock))
124 bch2_journal_pin_copy(j, &pin, &wb->journal_pin, NULL);
125 bch2_journal_pin_drop(j, &wb->journal_pin);
127 s = btree_write_buffer_switch(wb);
128 keys = wb->keys[s.idx];
132 * We first sort so that we can detect and skip redundant updates, and
133 * then we attempt to flush in sorted btree order, as this is most
136 * However, since we're not flushing in the order they appear in the
137 * journal we won't be able to drop our journal pin until everything is
138 * flushed - which means this could deadlock the journal, if we weren't
139 * passing BTREE_INSERT_JORUNAL_RECLAIM. This causes the update to fail
140 * if it would block taking a journal reservation.
142 * If that happens, we sort them by the order they appeared in the
143 * journal - after dropping redundant entries - and then restart
144 * flushing, this time dropping journal pins as we go.
147 sort(keys, nr, sizeof(keys[0]),
148 btree_write_buffered_key_cmp, NULL);
150 for (i = keys; i < keys + nr; i++) {
151 if (i + 1 < keys + nr &&
152 i[0].btree == i[1].btree &&
153 bpos_eq(i[0].k.k.p, i[1].k.k.p)) {
159 (iter.path->btree_id != i->btree ||
160 bpos_gt(i->k.k.p, iter.path->l[0].b->key.k.p))) {
161 bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b);
162 write_locked = false;
165 if (!iter.path || iter.path->btree_id != i->btree) {
166 bch2_trans_iter_exit(trans, &iter);
167 bch2_trans_iter_init(trans, &iter, i->btree, i->k.k.p, BTREE_ITER_INTENT);
170 bch2_btree_iter_set_pos(&iter, i->k.k.p);
171 iter.path->preserve = false;
174 ret = bch2_btree_write_buffer_flush_one(trans, &iter, i,
175 commit_flags, &write_locked, &fast);
177 bch2_trans_begin(trans);
178 } while (bch2_err_matches(ret, BCH_ERR_transaction_restart));
185 bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b);
186 bch2_trans_iter_exit(trans, &iter);
188 trace_write_buffer_flush(trans, nr, skipped, fast, wb->size);
190 if (ret == -BCH_ERR_journal_reclaim_would_deadlock)
193 bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret));
195 bch2_journal_pin_drop(j, &pin);
196 mutex_unlock(&wb->flush_lock);
199 trace_write_buffer_flush_slowpath(trans, i - keys, nr);
202 for (; i < keys + nr; i++) {
203 if (i + 1 < keys + nr &&
204 i[0].btree == i[1].btree &&
205 bpos_eq(i[0].k.k.p, i[1].k.k.p))
213 sort(keys, nr, sizeof(keys[0]),
214 btree_write_buffered_journal_cmp,
217 for (i = keys; i < keys + nr; i++) {
218 if (i->journal_seq > pin.seq) {
219 struct journal_entry_pin pin2;
221 memset(&pin2, 0, sizeof(pin2));
223 bch2_journal_pin_add(j, i->journal_seq, &pin2, NULL);
224 bch2_journal_pin_drop(j, &pin);
225 bch2_journal_pin_copy(j, &pin, &pin2, NULL);
226 bch2_journal_pin_drop(j, &pin2);
229 ret = commit_do(trans, NULL, NULL,
232 BTREE_INSERT_JOURNAL_RECLAIM|
233 JOURNAL_WATERMARK_reserved,
234 __bch2_btree_insert(trans, i->btree, &i->k, 0));
235 if (bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret)))
242 int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans)
244 bch2_trans_unlock(trans);
245 mutex_lock(&trans->c->btree_write_buffer.flush_lock);
246 return __bch2_btree_write_buffer_flush(trans, 0, true);
249 int bch2_btree_write_buffer_flush(struct btree_trans *trans)
251 return __bch2_btree_write_buffer_flush(trans, 0, false);
254 static int bch2_btree_write_buffer_journal_flush(struct journal *j,
255 struct journal_entry_pin *_pin, u64 seq)
257 struct bch_fs *c = container_of(j, struct bch_fs, journal);
258 struct btree_write_buffer *wb = &c->btree_write_buffer;
260 mutex_lock(&wb->flush_lock);
262 return bch2_trans_run(c,
263 __bch2_btree_write_buffer_flush(&trans, BTREE_INSERT_NOCHECK_RW, true));
266 static inline u64 btree_write_buffer_ref(int idx)
268 return ((union btree_write_buffer_state) {
274 int bch2_btree_insert_keys_write_buffer(struct btree_trans *trans)
276 struct bch_fs *c = trans->c;
277 struct btree_write_buffer *wb = &c->btree_write_buffer;
278 struct btree_write_buffered_key *i;
279 union btree_write_buffer_state old, new;
283 trans_for_each_wb_update(trans, i) {
284 EBUG_ON(i->k.k.u64s > BTREE_WRITE_BUFERED_U64s_MAX);
286 i->journal_seq = trans->journal_res.seq;
287 i->journal_offset = trans->journal_res.offset;
291 v = READ_ONCE(wb->state.v);
295 new.v += btree_write_buffer_ref(new.idx);
296 new.nr += trans->nr_wb_updates;
297 if (new.nr > wb->size) {
298 ret = -BCH_ERR_btree_insert_need_flush_buffer;
301 } while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
303 memcpy(wb->keys[new.idx] + old.nr,
305 sizeof(trans->wb_updates[0]) * trans->nr_wb_updates);
307 bch2_journal_pin_add(&c->journal, trans->journal_res.seq, &wb->journal_pin,
308 bch2_btree_write_buffer_journal_flush);
310 atomic64_sub_return_release(btree_write_buffer_ref(new.idx), &wb->state.counter);
316 void bch2_fs_btree_write_buffer_exit(struct bch_fs *c)
318 struct btree_write_buffer *wb = &c->btree_write_buffer;
320 BUG_ON(wb->state.nr && !bch2_journal_error(&c->journal));
326 int bch2_fs_btree_write_buffer_init(struct bch_fs *c)
328 struct btree_write_buffer *wb = &c->btree_write_buffer;
330 mutex_init(&wb->flush_lock);
331 wb->size = c->opts.btree_write_buffer_size;
333 wb->keys[0] = kvmalloc_array(wb->size, sizeof(*wb->keys[0]), GFP_KERNEL);
334 wb->keys[1] = kvmalloc_array(wb->size, sizeof(*wb->keys[1]), GFP_KERNEL);
335 if (!wb->keys[0] || !wb->keys[1])