*/
#include "bcachefs.h"
-#include "alloc.h"
+#include "alloc_foreground.h"
#include "bkey_methods.h"
#include "btree_gc.h"
#include "buckets.h"
test_bit(JOURNAL_NEED_WRITE, &j->flags))
bch2_time_stats_update(j->delay_time,
j->need_write_time);
-#if 0
- closure_call(&j->io, bch2_journal_write, NULL, NULL);
-#else
- /* Shut sparse up: */
- closure_init(&j->io, NULL);
- set_closure_fn(&j->io, bch2_journal_write, NULL);
- bch2_journal_write(&j->io);
-#endif
+
+ closure_call(&j->io, bch2_journal_write, system_highpri_wq, NULL);
}
static void journal_pin_new_entry(struct journal *j, int count)
return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
}
+static inline bool journal_entry_empty(struct jset *j)
+{
+ struct jset_entry *i;
+
+ if (j->seq != j->last_seq)
+ return false;
+
+ vstruct_for_each(j, i)
+ if (i->type || i->u64s)
+ return false;
+ return true;
+}
+
static enum {
JOURNAL_ENTRY_ERROR,
JOURNAL_ENTRY_INUSE,
} journal_buf_switch(struct journal *j, bool need_write_just_set)
{
struct bch_fs *c = container_of(j, struct bch_fs, journal);
- struct journal_buf *buf;
+ struct journal_buf *buf = journal_cur_buf(j);
union journal_res_state old, new;
u64 v = atomic64_read(&j->reservations.counter);
if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
return JOURNAL_ENTRY_CLOSED;
- if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
+ if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) {
+ /* this entry will never be written: */
+ closure_wake_up(&buf->wait);
return JOURNAL_ENTRY_ERROR;
+ }
if (new.prev_buf_unwritten)
return JOURNAL_ENTRY_INUSE;
clear_bit(JOURNAL_NEED_WRITE, &j->flags);
- buf = &j->buf[old.idx];
buf->data->u64s = cpu_to_le32(old.cur_entry_offset);
j->prev_buf_sectors =
c->opts.block_size;
BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
+ bkey_extent_init(&buf->key);
+
+ /*
+ * We have to set last_seq here, _before_ opening a new journal entry:
+ *
+ * A threads may replace an old pin with a new pin on their current
+ * journal reservation - the expectation being that the journal will
+ * contain either what the old pin protected or what the new pin
+ * protects.
+ *
+ * After the old pin is dropped journal_last_seq() won't include the old
+ * pin, so we can only write the updated last_seq on the entry that
+ * contains whatever the new pin protects.
+ *
+ * Restated, we can _not_ update last_seq for a given entry if there
+ * could be a newer entry open with reservations/pins that have been
+ * taken against it.
+ *
+ * Hence, we want update/set last_seq on the current journal entry right
+ * before we open a new one:
+ */
bch2_journal_reclaim_fast(j);
- /* XXX: why set this here, and not in bch2_journal_write()? */
buf->data->last_seq = cpu_to_le64(journal_last_seq(j));
+ if (journal_entry_empty(buf->data))
+ clear_bit(JOURNAL_NOT_EMPTY, &j->flags);
+ else
+ set_bit(JOURNAL_NOT_EMPTY, &j->flags);
+
journal_pin_new_entry(j, 1);
bch2_journal_buf_init(j);
cancel_delayed_work(&j->write_work);
spin_unlock(&j->lock);
- if (c->bucket_journal_seq > 1 << 14) {
- c->bucket_journal_seq = 0;
- bch2_bucket_seq_cleanup(c);
- }
-
- c->bucket_journal_seq++;
-
/* ugh - might be called from __journal_res_get() under wait_event() */
__set_current_state(TASK_RUNNING);
bch2_journal_buf_put(j, old.idx, need_write_just_set);
return 1;
}
-/*
- * returns true if there's nothing to flush and no journal write still in flight
- */
-static bool journal_flush_write(struct journal *j)
+static bool __journal_entry_close(struct journal *j)
{
- bool ret;
-
- spin_lock(&j->lock);
- ret = !j->reservations.prev_buf_unwritten;
+ bool set_need_write;
if (!journal_entry_is_open(j)) {
spin_unlock(&j->lock);
- return ret;
+ return true;
}
- set_bit(JOURNAL_NEED_WRITE, &j->flags);
- if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
- ret = false;
- else
+ set_need_write = !test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags);
+ if (set_need_write)
+ j->need_write_time = local_clock();
+
+ switch (journal_buf_switch(j, set_need_write)) {
+ case JOURNAL_ENTRY_INUSE:
spin_unlock(&j->lock);
- return ret;
+ return false;
+ default:
+ spin_unlock(&j->lock);
+ case JOURNAL_UNLOCKED:
+ return true;
+ }
+}
+
+static bool journal_entry_close(struct journal *j)
+{
+ spin_lock(&j->lock);
+ return __journal_entry_close(j);
}
static void journal_write_work(struct work_struct *work)
{
struct journal *j = container_of(work, struct journal, write_work.work);
- journal_flush_write(j);
+ journal_entry_close(j);
}
/*
}
static int __journal_res_get(struct journal *j, struct journal_res *res,
- unsigned u64s_min, unsigned u64s_max)
+ unsigned flags)
{
struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_buf *buf;
int ret;
retry:
- ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
- if (ret)
- return ret;
+ if (journal_res_get_fast(j, res, flags))
+ return 0;
spin_lock(&j->lock);
/*
* that just did journal_entry_open() and call journal_entry_close()
* unnecessarily
*/
- ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
- if (ret) {
+ if (journal_res_get_fast(j, res, flags)) {
spin_unlock(&j->lock);
- return 1;
+ return 0;
}
/*
spin_unlock(&j->lock);
return -EROFS;
case JOURNAL_ENTRY_INUSE:
- /* haven't finished writing out the previous one: */
+ /*
+ * The current journal entry is still open, but we failed to get
+ * a journal reservation because there's not enough space in it,
+ * and we can't close it and start another because we haven't
+ * finished writing out the previous entry:
+ */
spin_unlock(&j->lock);
trace_journal_entry_full(c);
goto blocked;
blocked:
if (!j->res_get_blocked_start)
j->res_get_blocked_start = local_clock() ?: 1;
- return 0;
+ return -EAGAIN;
}
/*
* btree node write locks.
*/
int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
- unsigned u64s_min, unsigned u64s_max)
+ unsigned flags)
{
int ret;
wait_event(j->wait,
- (ret = __journal_res_get(j, res, u64s_min,
- u64s_max)));
- return ret < 0 ? ret : 0;
+ (ret = __journal_res_get(j, res, flags)) != -EAGAIN ||
+ (flags & JOURNAL_RES_GET_NONBLOCK));
+ return ret;
}
u64 bch2_journal_last_unwritten_seq(struct journal *j)
* btree root - every journal entry contains the roots of all the btrees, so it
* doesn't need to bother with getting a journal reservation
*/
-int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
+int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *cl)
{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
int ret;
-
+retry:
spin_lock(&j->lock);
- BUG_ON(seq > journal_cur_seq(j));
if (seq < journal_cur_seq(j) ||
journal_entry_is_open(j)) {
spin_unlock(&j->lock);
- return 1;
+ return 0;
}
+ if (journal_cur_seq(j) < seq) {
+ switch (journal_buf_switch(j, false)) {
+ case JOURNAL_ENTRY_ERROR:
+ spin_unlock(&j->lock);
+ return -EROFS;
+ case JOURNAL_ENTRY_INUSE:
+ /* haven't finished writing out the previous one: */
+ trace_journal_entry_full(c);
+ goto blocked;
+ case JOURNAL_ENTRY_CLOSED:
+ break;
+ case JOURNAL_UNLOCKED:
+ goto retry;
+ }
+ }
+
+ BUG_ON(journal_cur_seq(j) < seq);
+
ret = journal_entry_open(j);
- if (!ret)
- closure_wait(&j->async_wait, parent);
+ if (ret) {
+ spin_unlock(&j->lock);
+ return ret < 0 ? ret : 0;
+ }
+blocked:
+ if (!j->res_get_blocked_start)
+ j->res_get_blocked_start = local_clock() ?: 1;
+
+ closure_wait(&j->async_wait, cl);
spin_unlock(&j->lock);
- if (!ret)
- bch2_journal_reclaim_work(&j->reclaim_work.work);
+ bch2_journal_reclaim_work(&j->reclaim_work.work);
+ return -EAGAIN;
+}
+
+static int journal_seq_error(struct journal *j, u64 seq)
+{
+ union journal_res_state state = READ_ONCE(j->reservations);
- return ret;
+ if (seq == journal_cur_seq(j))
+ return bch2_journal_error(j);
+
+ if (seq + 1 == journal_cur_seq(j) &&
+ !state.prev_buf_unwritten &&
+ seq > j->seq_ondisk)
+ return -EIO;
+
+ return 0;
+}
+
+static inline struct journal_buf *
+journal_seq_to_buf(struct journal *j, u64 seq)
+{
+ /* seq should be for a journal entry that has been opened: */
+ BUG_ON(seq > journal_cur_seq(j));
+ BUG_ON(seq == journal_cur_seq(j) &&
+ j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
+
+ if (seq == journal_cur_seq(j))
+ return journal_cur_buf(j);
+ if (seq + 1 == journal_cur_seq(j) &&
+ j->reservations.prev_buf_unwritten)
+ return journal_prev_buf(j);
+ return NULL;
}
/**
* can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is
* configurable).
*/
-void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
+void bch2_journal_wait_on_seq(struct journal *j, u64 seq,
+ struct closure *parent)
{
- spin_lock(&j->lock);
-
- BUG_ON(seq > journal_cur_seq(j));
+ struct journal_buf *buf;
- if (bch2_journal_error(j)) {
- spin_unlock(&j->lock);
- return;
- }
+ spin_lock(&j->lock);
- if (seq == journal_cur_seq(j)) {
- if (!closure_wait(&journal_cur_buf(j)->wait, parent))
- BUG();
- } else if (seq + 1 == journal_cur_seq(j) &&
- j->reservations.prev_buf_unwritten) {
- if (!closure_wait(&journal_prev_buf(j)->wait, parent))
+ if ((buf = journal_seq_to_buf(j, seq))) {
+ if (!closure_wait(&buf->wait, parent))
BUG();
- smp_mb();
-
- /* check if raced with write completion (or failure) */
- if (!j->reservations.prev_buf_unwritten ||
- bch2_journal_error(j))
- closure_wake_up(&journal_prev_buf(j)->wait);
+ if (seq == journal_cur_seq(j)) {
+ smp_mb();
+ if (bch2_journal_error(j))
+ closure_wake_up(&buf->wait);
+ }
}
spin_unlock(&j->lock);
* like bch2_journal_wait_on_seq, except that it triggers a write immediately if
* necessary
*/
-void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
+void bch2_journal_flush_seq_async(struct journal *j, u64 seq,
+ struct closure *parent)
{
struct journal_buf *buf;
spin_lock(&j->lock);
- BUG_ON(seq > journal_cur_seq(j));
-
- if (bch2_journal_error(j)) {
- spin_unlock(&j->lock);
- return;
- }
-
- if (seq == journal_cur_seq(j)) {
- bool set_need_write = false;
-
- buf = journal_cur_buf(j);
-
- if (parent && !closure_wait(&buf->wait, parent))
- BUG();
-
- if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
- j->need_write_time = local_clock();
- set_need_write = true;
- }
-
- switch (journal_buf_switch(j, set_need_write)) {
- case JOURNAL_ENTRY_ERROR:
- if (parent)
- closure_wake_up(&buf->wait);
- break;
- case JOURNAL_ENTRY_CLOSED:
- /*
- * Journal entry hasn't been opened yet, but caller
- * claims it has something
- */
- BUG();
- case JOURNAL_ENTRY_INUSE:
- break;
- case JOURNAL_UNLOCKED:
- return;
- }
- } else if (parent &&
- seq + 1 == journal_cur_seq(j) &&
- j->reservations.prev_buf_unwritten) {
- buf = journal_prev_buf(j);
-
+ if (parent &&
+ (buf = journal_seq_to_buf(j, seq)))
if (!closure_wait(&buf->wait, parent))
BUG();
- smp_mb();
-
- /* check if raced with write completion (or failure) */
- if (!j->reservations.prev_buf_unwritten ||
- bch2_journal_error(j))
- closure_wake_up(&buf->wait);
- }
-
- spin_unlock(&j->lock);
+ if (seq == journal_cur_seq(j))
+ __journal_entry_close(j);
+ else
+ spin_unlock(&j->lock);
}
static int journal_seq_flushed(struct journal *j, u64 seq)
{
- struct journal_buf *buf;
- int ret = 1;
+ int ret;
spin_lock(&j->lock);
- BUG_ON(seq > journal_cur_seq(j));
+ ret = seq <= j->seq_ondisk ? 1 : journal_seq_error(j, seq);
- if (seq == journal_cur_seq(j)) {
- bool set_need_write = false;
-
- ret = 0;
-
- buf = journal_cur_buf(j);
-
- if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
- j->need_write_time = local_clock();
- set_need_write = true;
- }
-
- switch (journal_buf_switch(j, set_need_write)) {
- case JOURNAL_ENTRY_ERROR:
- ret = -EIO;
- break;
- case JOURNAL_ENTRY_CLOSED:
- /*
- * Journal entry hasn't been opened yet, but caller
- * claims it has something
- */
- BUG();
- case JOURNAL_ENTRY_INUSE:
- break;
- case JOURNAL_UNLOCKED:
- return 0;
- }
- } else if (seq + 1 == journal_cur_seq(j) &&
- j->reservations.prev_buf_unwritten) {
- ret = bch2_journal_error(j);
- }
-
- spin_unlock(&j->lock);
+ if (seq == journal_cur_seq(j))
+ __journal_entry_close(j);
+ else
+ spin_unlock(&j->lock);
return ret;
}
void bch2_journal_meta_async(struct journal *j, struct closure *parent)
{
struct journal_res res;
- unsigned u64s = jset_u64s(0);
memset(&res, 0, sizeof(res));
- bch2_journal_res_get(j, &res, u64s, u64s);
+ bch2_journal_res_get(j, &res, jset_u64s(0), 0);
bch2_journal_res_put(j, &res);
bch2_journal_flush_seq_async(j, res.seq, parent);
int bch2_journal_meta(struct journal *j)
{
struct journal_res res;
- unsigned u64s = jset_u64s(0);
int ret;
memset(&res, 0, sizeof(res));
- ret = bch2_journal_res_get(j, &res, u64s, u64s);
+ ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0);
if (ret)
return ret;
if (!journal_buckets)
goto err;
+ /*
+ * We may be called from the device add path, before the new device has
+ * actually been added to the running filesystem:
+ */
if (c)
spin_lock(&c->journal.lock);
goto err;
}
} else {
- int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl);
- if (ob_idx < 0) {
+ ob = bch2_bucket_alloc(c, ca, RESERVE_ALLOC,
+ false, cl);
+ if (IS_ERR(ob)) {
ret = cl ? -EAGAIN : -ENOSPC;
goto err;
}
- ob = c->open_buckets + ob_idx;
bucket = sector_to_bucket(ca, ob->ptr.offset);
}
- if (c)
+ if (c) {
+ percpu_down_read_preempt_disable(&c->mark_lock);
spin_lock(&c->journal.lock);
+ } else {
+ preempt_disable();
+ }
__array_insert_item(ja->buckets, ja->nr, ja->last_idx);
__array_insert_item(ja->bucket_seq, ja->nr, ja->last_idx);
}
ja->nr++;
- if (c)
- spin_unlock(&c->journal.lock);
-
bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
ca->mi.bucket_size,
gc_phase(GC_PHASE_SB),
- new_fs
- ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE
- : 0);
+ 0);
+
+ if (c) {
+ spin_unlock(&c->journal.lock);
+ percpu_up_read_preempt_enable(&c->mark_lock);
+ } else {
+ preempt_enable();
+ }
if (!new_fs)
bch2_open_bucket_put(c, ob);
void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
{
- spin_lock(&j->lock);
- bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
- spin_unlock(&j->lock);
-
wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
}
void bch2_fs_journal_stop(struct journal *j)
{
- wait_event(j->wait, journal_flush_write(j));
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
+
+ wait_event(j->wait, journal_entry_close(j));
+
+ /* do we need to write another journal entry? */
+ if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) ||
+ c->btree_roots_dirty)
+ bch2_journal_meta(j);
+
+ BUG_ON(journal_entry_is_open(j) ||
+ j->reservations.prev_buf_unwritten);
+
+ BUG_ON(!bch2_journal_error(j) &&
+ test_bit(JOURNAL_NOT_EMPTY, &j->flags));
cancel_delayed_work_sync(&j->write_work);
cancel_delayed_work_sync(&j->reclaim_work);
void bch2_fs_journal_start(struct journal *j)
{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
struct journal_seq_blacklist *bl;
u64 blacklist = 0;
journal_pin_new_entry(j, 1);
bch2_journal_buf_init(j);
+ c->last_bucket_seq_cleanup = journal_cur_seq(j);
+
spin_unlock(&j->lock);
/*
init_waitqueue_head(&j->wait);
INIT_DELAYED_WORK(&j->write_work, journal_write_work);
INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work);
+ init_waitqueue_head(&j->pin_flush_wait);
mutex_init(&j->blacklist_lock);
INIT_LIST_HEAD(&j->seq_blacklist);
mutex_init(&j->reclaim_lock);
j->write_delay_ms = 1000;
j->reclaim_delay_ms = 100;
- bkey_extent_init(&j->key);
-
atomic64_set(&j->reservations.counter,
((union journal_res_state)
{ .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
{
+ struct printbuf out = _PBUF(buf, PAGE_SIZE);
struct bch_fs *c = container_of(j, struct bch_fs, journal);
union journal_res_state *s = &j->reservations;
struct bch_dev *ca;
unsigned iter;
- ssize_t ret = 0;
rcu_read_lock();
spin_lock(&j->lock);
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "active journal entries:\t%llu\n"
- "seq:\t\t\t%llu\n"
- "last_seq:\t\t%llu\n"
- "last_seq_ondisk:\t%llu\n"
- "reservation count:\t%u\n"
- "reservation offset:\t%u\n"
- "current entry u64s:\t%u\n"
- "io in flight:\t\t%i\n"
- "need write:\t\t%i\n"
- "dirty:\t\t\t%i\n"
- "replay done:\t\t%i\n",
- fifo_used(&j->pin),
- journal_cur_seq(j),
- journal_last_seq(j),
- j->last_seq_ondisk,
- journal_state_count(*s, s->idx),
- s->cur_entry_offset,
- j->cur_entry_u64s,
- s->prev_buf_unwritten,
- test_bit(JOURNAL_NEED_WRITE, &j->flags),
- journal_entry_is_open(j),
- test_bit(JOURNAL_REPLAY_DONE, &j->flags));
+ pr_buf(&out,
+ "active journal entries:\t%llu\n"
+ "seq:\t\t\t%llu\n"
+ "last_seq:\t\t%llu\n"
+ "last_seq_ondisk:\t%llu\n"
+ "reservation count:\t%u\n"
+ "reservation offset:\t%u\n"
+ "current entry u64s:\t%u\n"
+ "io in flight:\t\t%i\n"
+ "need write:\t\t%i\n"
+ "dirty:\t\t\t%i\n"
+ "replay done:\t\t%i\n",
+ fifo_used(&j->pin),
+ journal_cur_seq(j),
+ journal_last_seq(j),
+ j->last_seq_ondisk,
+ journal_state_count(*s, s->idx),
+ s->cur_entry_offset,
+ j->cur_entry_u64s,
+ s->prev_buf_unwritten,
+ test_bit(JOURNAL_NEED_WRITE, &j->flags),
+ journal_entry_is_open(j),
+ test_bit(JOURNAL_REPLAY_DONE, &j->flags));
for_each_member_device_rcu(ca, c, iter,
&c->rw_devs[BCH_DATA_JOURNAL]) {
if (!ja->nr)
continue;
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "dev %u:\n"
- "\tnr\t\t%u\n"
- "\tcur_idx\t\t%u (seq %llu)\n"
- "\tlast_idx\t%u (seq %llu)\n",
- iter, ja->nr,
- ja->cur_idx, ja->bucket_seq[ja->cur_idx],
- ja->last_idx, ja->bucket_seq[ja->last_idx]);
+ pr_buf(&out,
+ "dev %u:\n"
+ "\tnr\t\t%u\n"
+ "\tcur_idx\t\t%u (seq %llu)\n"
+ "\tlast_idx\t%u (seq %llu)\n",
+ iter, ja->nr,
+ ja->cur_idx, ja->bucket_seq[ja->cur_idx],
+ ja->last_idx, ja->bucket_seq[ja->last_idx]);
}
spin_unlock(&j->lock);
rcu_read_unlock();
- return ret;
+ return out.pos - buf;
}
ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
{
+ struct printbuf out = _PBUF(buf, PAGE_SIZE);
struct journal_entry_pin_list *pin_list;
struct journal_entry_pin *pin;
- ssize_t ret = 0;
u64 i;
spin_lock(&j->lock);
fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "%llu: count %u\n",
- i, atomic_read(&pin_list->count));
+ pr_buf(&out, "%llu: count %u\n",
+ i, atomic_read(&pin_list->count));
list_for_each_entry(pin, &pin_list->list, list)
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "\t%p %pf\n",
- pin, pin->flush);
+ pr_buf(&out, "\t%p %pf\n",
+ pin, pin->flush);
if (!list_empty(&pin_list->flushed))
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "flushed:\n");
+ pr_buf(&out, "flushed:\n");
list_for_each_entry(pin, &pin_list->flushed, list)
- ret += scnprintf(buf + ret, PAGE_SIZE - ret,
- "\t%p %pf\n",
- pin, pin->flush);
+ pr_buf(&out, "\t%p %pf\n",
+ pin, pin->flush);
}
spin_unlock(&j->lock);
- return ret;
+ return out.pos - buf;
}