From 009ba1838c9185844acd34458ef863828b8ab143 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Wed, 30 Sep 2020 10:20:10 +0200 Subject: [PATCH] Replace mmap with io_uring. This moves to explicit, asynchronous I/O through io_uring (Linux 5.1+), which speeds up cold-cache behavior on rotating media by 3x or so. It also removes any issues we might have with not fitting into 32-bit address spaces. If io_uring is not available, regular synchronous I/O will be used instead. For now, there's a dependency on liburing, but it will be optional soon for older systems. --- Makefile | 4 +- io_uring_engine.cpp | 155 +++++++++++++++++++++ io_uring_engine.h | 51 +++++++ plocate.cpp | 321 +++++++++++++++++++++++++++++--------------- 4 files changed, 424 insertions(+), 107 deletions(-) create mode 100644 io_uring_engine.cpp create mode 100644 io_uring_engine.h diff --git a/Makefile b/Makefile index ccbaf15..91e8b17 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,8 @@ PREFIX ?= /usr/local all: plocate plocate-build -plocate: plocate.o TurboPFor-Integer-Compression/libic.a - $(CXX) -o $@ $^ -lzstd +plocate: plocate.o io_uring_engine.o TurboPFor-Integer-Compression/libic.a + $(CXX) -o $@ $^ -lzstd $(shell pkg-config --libs liburing) plocate-build: plocate-build.o TurboPFor-Integer-Compression/libic.a $(CXX) -o $@ $^ -lzstd diff --git a/io_uring_engine.cpp b/io_uring_engine.cpp new file mode 100644 index 0000000..b10639d --- /dev/null +++ b/io_uring_engine.cpp @@ -0,0 +1,155 @@ +#include +#include +#include +#include +#include +#include + +#include "io_uring_engine.h" + +using namespace std; + +IOUringEngine::IOUringEngine() +{ + int ret = io_uring_queue_init(queue_depth, &ring, 0); + using_uring = (ret >= 0); +} + +void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) +{ + if (!using_uring) { + // Synchronous read. + string s; + s.resize(len); + complete_pread(fd, &s[0], len, offset); + cb(move(s)); + return; + } + + if (pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); + } + submit_read_internal(sqe, fd, len, offset, move(cb)); + } else { + queued_reads.push(QueuedRead{ fd, len, offset, move(cb) }); + } +} + +void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function cb) +{ + void *buf; + if (posix_memalign(&buf, /*alignment=*/4096, len)) { + fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno)); + exit(1); + } + PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } }; + + io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset); + io_uring_sqe_set_data(sqe, pending); + ++pending_reads; +} + +void IOUringEngine::finish() +{ + if (!using_uring) { + return; + } + + int ret = io_uring_submit(&ring); + if (ret < 0) { + fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret)); + exit(1); + } + bool anything_to_submit = false; + while (pending_reads > 0) { + io_uring_cqe *cqe; + ret = io_uring_wait_cqe(&ring, &cqe); + if (ret < 0) { + fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret)); + exit(1); + } + + PendingRead *pending = reinterpret_cast(cqe->user_data); + if (cqe->res <= 0) { + fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); + exit(1); + } + + if (size_t(cqe->res) < pending->iov.iov_len) { + // Incomplete read, so resubmit it. + pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res; + pending->iov.iov_len -= cqe->res; + pending->offset += cqe->res; + io_uring_cqe_seen(&ring, cqe); + + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); + exit(1); + } + io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset); + io_uring_sqe_set_data(sqe, pending); + anything_to_submit = true; + } else { + io_uring_cqe_seen(&ring, cqe); + --pending_reads; + + size_t old_pending_reads = pending_reads; + pending->cb(string(reinterpret_cast(pending->buf), pending->len)); + free(pending->buf); + delete pending; + + if (pending_reads != old_pending_reads) { + // A new read was made in the callback (and not queued), + // so we need to re-submit. + anything_to_submit = true; + } + } + + // See if there are any queued reads we can submit now. + while (!queued_reads.empty() && pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); + } + QueuedRead &qr = queued_reads.front(); + submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb)); + queued_reads.pop(); + anything_to_submit = true; + } + + if (anything_to_submit) { + // A new read was made, so we need to re-submit. + int ret = io_uring_submit(&ring); + if (ret < 0) { + fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret)); + exit(1); + } else { + anything_to_submit = false; + } + } + } +} + +void complete_pread(int fd, void *ptr, size_t len, off_t offset) +{ + while (len > 0) { + ssize_t ret = pread(fd, ptr, len, offset); + if (ret == -1 && errno == EINTR) { + continue; + } + if (ret <= 0) { + perror("pread"); + exit(1); + } + ptr = reinterpret_cast(ptr) + ret; + len -= ret; + offset -= ret; + } +} + + diff --git a/io_uring_engine.h b/io_uring_engine.h new file mode 100644 index 0000000..93e7a0d --- /dev/null +++ b/io_uring_engine.h @@ -0,0 +1,51 @@ +#ifndef IO_URING_ENGINE_H +#define IO_URING_ENGINE_H 1 + +#include +#include +#include +#include +#include + +class IOUringEngine { +public: + IOUringEngine(); + void submit_read(int fd, size_t len, off_t offset, std::function cb); + void finish(); + size_t get_waiting_reads() const { return pending_reads + queued_reads.size(); } + +private: + void submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, std::function cb); + + io_uring ring; + size_t pending_reads = 0; // Number of requests we have going in the ring. + bool using_uring; + + struct QueuedRead { + int fd; + size_t len; + off_t offset; + std::function cb; + }; + std::queue queued_reads; + + struct PendingRead { + void *buf; + size_t len; + std::function cb; + + // For re-submission. + int fd; + off_t offset; + iovec iov; + }; + + // 256 simultaneous requests should be ample, for slow and fast media alike. + static constexpr size_t queue_depth = 256; +}; + +// A wrapper around pread() that returns an incomplete read. +// Always synchronous (no io_uring). +void complete_pread(int fd, void *ptr, size_t len, off_t offset); + +#endif // !defined(IO_URING_ENGINE_H) diff --git a/plocate.cpp b/plocate.cpp index 4b7fbc1..f461d0b 100644 --- a/plocate.cpp +++ b/plocate.cpp @@ -1,14 +1,16 @@ #include "vp4.h" +#include "io_uring_engine.h" #include #include #include #include #include +#include +#include #include #include #include -#include #include #include #include @@ -20,6 +22,41 @@ using namespace std::chrono; #define dprintf(...) //#define dprintf(...) fprintf(stderr, __VA_ARGS__); +class Serializer { +public: + void do_or_wait(int seq, function cb); + +private: + int next_seq = 0; + struct Element { + int seq; + function cb; + + bool operator<(const Element &other) const + { + return seq > other.seq; + } + }; + priority_queue pending; +}; + +void Serializer::do_or_wait(int seq, function cb) +{ + if (seq != next_seq) { + pending.emplace(Element{ seq, move(cb) }); + return; + } + + cb(); + ++next_seq; + + while (!pending.empty() && pending.top().seq == next_seq) { + pending.top().cb(); + pending.pop(); + ++next_seq; + } +} + static inline uint32_t read_unigram(const string &s, size_t idx) { if (idx < s.size()) { @@ -62,93 +99,115 @@ struct Trigram { uint32_t trgm; uint32_t num_docids; uint64_t offset; + + bool operator==(const Trigram &other) const + { + return trgm == other.trgm; + } + bool operator<(const Trigram &other) const + { + return trgm < other.trgm; + } }; class Corpus { public: - Corpus(int fd); + Corpus(int fd, IOUringEngine *engine); ~Corpus(); - const Trigram *find_trigram(uint32_t trgm) const; - const unsigned char * - get_compressed_posting_list(const Trigram *trigram) const; - string_view get_compressed_filename_block(uint32_t docid) const; + void find_trigram(uint32_t trgm, function cb); + void get_compressed_filename_block(uint32_t docid, function cb) const; size_t get_num_filename_blocks() const; + off_t offset_for_block(uint32_t docid) const { + return filename_index_offset + docid * sizeof(uint64_t); + } -private: +public: const int fd; + IOUringEngine *const engine; + off_t len; - const char *data; - const uint64_t *filename_offsets; - const Trigram *trgm_begin, *trgm_end; + uint64_t filename_index_offset; + + uint64_t num_trigrams; + const off_t trigram_offset = sizeof(uint64_t) * 2; + + void binary_search_trigram(uint32_t trgm, uint32_t left, uint32_t right, function cb); }; -Corpus::Corpus(int fd) - : fd(fd) +Corpus::Corpus(int fd, IOUringEngine *engine) + : fd(fd), engine(engine) { len = lseek(fd, 0, SEEK_END); if (len == -1) { perror("lseek"); exit(1); } - data = (char *)mmap(nullptr, len, PROT_READ, MAP_SHARED, fd, /*offset=*/0); - if (data == MAP_FAILED) { - perror("mmap"); - exit(1); - } - uint64_t num_trigrams = *(const uint64_t *)data; - uint64_t filename_index_offset = *(const uint64_t *)(data + sizeof(uint64_t)); - filename_offsets = (const uint64_t *)(data + filename_index_offset); + // Uncomment to test cold-cache behavior (except for access()). + // posix_fadvise(fd, 0, len, POSIX_FADV_DONTNEED); - trgm_begin = (Trigram *)(data + sizeof(uint64_t) * 2); - trgm_end = trgm_begin + num_trigrams; + uint64_t vals[2]; + complete_pread(fd, vals, sizeof(vals), /*offset=*/0); + + num_trigrams = vals[0]; + filename_index_offset = vals[1]; } Corpus::~Corpus() { - munmap((void *)data, len); close(fd); } -const Trigram *Corpus::find_trigram(uint32_t trgm) const +void Corpus::find_trigram(uint32_t trgm, function cb) { - const Trigram *trgmptr = lower_bound( - trgm_begin, trgm_end, trgm, - [](const Trigram &trgm, uint32_t t) { return trgm.trgm < t; }); - if (trgmptr == trgm_end || trgmptr->trgm != trgm) { - return nullptr; - } - return trgmptr; + binary_search_trigram(trgm, 0, num_trigrams - 1, move(cb)); } -const unsigned char * -Corpus::get_compressed_posting_list(const Trigram *trgmptr) const +void Corpus::binary_search_trigram(uint32_t trgm, uint32_t left, uint32_t right, function cb) { - return reinterpret_cast(data + trgmptr->offset); + if (left > right) { + cb(nullptr, 0); + return; + } + uint32_t mid = (left + right) / 2; + engine->submit_read(fd, sizeof(Trigram) * 2, trigram_offset + sizeof(Trigram) * mid, [this, trgm, left, mid, right, cb{ move(cb) }](string s) { + const Trigram *trgmptr = reinterpret_cast(s.data()); + const Trigram *next_trgmptr = trgmptr + 1; + if (trgmptr->trgm < trgm) { + binary_search_trigram(trgm, mid + 1, right, move(cb)); + } else if (trgmptr->trgm > trgm) { + binary_search_trigram(trgm, left, mid - 1, move(cb)); + } else { + cb(trgmptr, next_trgmptr->offset - trgmptr->offset); + } + }); } -string_view Corpus::get_compressed_filename_block(uint32_t docid) const +void Corpus::get_compressed_filename_block(uint32_t docid, function cb) const { - const char *compressed = (const char *)(data + filename_offsets[docid]); - size_t compressed_size = - filename_offsets[docid + 1] - - filename_offsets[docid]; // Allowed we have a sentinel block at the end. - return { compressed, compressed_size }; + // Read the file offset from this docid and the next one. + // This is always allowed, since we have a sentinel block at the end. + engine->submit_read(fd, sizeof(uint64_t) * 2, offset_for_block(docid), [this, cb{ move(cb) }](string s) { + const uint64_t *ptr = reinterpret_cast(s.data()); + off_t offset = ptr[0]; + size_t len = ptr[1] - ptr[0]; + engine->submit_read(fd, len, offset, cb); + }); } size_t Corpus::get_num_filename_blocks() const { // The beginning of the filename blocks is the end of the filename index blocks. - const uint64_t *filename_offsets_end = (const uint64_t *)(data + filename_offsets[0]); + uint64_t end; + complete_pread(fd, &end, sizeof(end), filename_index_offset); // Subtract the sentinel block. - return filename_offsets_end - filename_offsets - 1; + return (end - filename_index_offset) / sizeof(uint64_t) - 1; } -size_t scan_docid(const string &needle, uint32_t docid, const Corpus &corpus, - unordered_map *access_rx_cache) +size_t scan_file_block(const string &needle, string_view compressed, + unordered_map *access_rx_cache) { - string_view compressed = corpus.get_compressed_filename_block(docid); size_t matched = 0; string block; @@ -173,6 +232,49 @@ size_t scan_docid(const string &needle, uint32_t docid, const Corpus &corpus, return matched; } +size_t scan_docids(const string &needle, const vector &docids, const Corpus &corpus, IOUringEngine *engine) +{ + Serializer docids_in_order; + unordered_map access_rx_cache; + size_t matched = 0; + for (size_t i = 0; i < docids.size(); ++i) { + uint32_t docid = docids[i]; + corpus.get_compressed_filename_block(docid, [i, &matched, &needle, &access_rx_cache, &docids_in_order](string compressed) { + docids_in_order.do_or_wait(i, [&matched, &needle, compressed{ move(compressed) }, &access_rx_cache] { + matched += scan_file_block(needle, compressed, &access_rx_cache); + }); + }); + } + engine->finish(); + return matched; +} + +// We do this sequentially, as it's faster than scattering +// a lot of I/O through io_uring and hoping the kernel will +// coalesce it plus readahead for us. +void scan_all_docids(const string &needle, int fd, const Corpus &corpus, IOUringEngine *engine) +{ + unordered_map access_rx_cache; + uint32_t num_blocks = corpus.get_num_filename_blocks(); + unique_ptr offsets(new uint64_t[num_blocks + 1]); + complete_pread(fd, offsets.get(), (num_blocks + 1) * sizeof(uint64_t), corpus.offset_for_block(0)); + string compressed; + for (uint32_t io_docid = 0; io_docid < num_blocks; io_docid += 32) { + uint32_t last_docid = std::min(io_docid + 32, num_blocks); + size_t io_len = offsets[last_docid] - offsets[io_docid]; + if (compressed.size() < io_len) { + compressed.resize(io_len); + } + complete_pread(fd, &compressed[0], io_len, offsets[io_docid]); + + for (uint32_t docid = io_docid; docid < last_docid; ++docid) { + size_t relative_offset = offsets[docid] - offsets[io_docid]; + size_t len = offsets[docid + 1] - offsets[docid]; + scan_file_block(needle, {&compressed[relative_offset], len}, &access_rx_cache); + } + } +} + void do_search_file(const string &needle, const char *filename) { int fd = open(filename, O_RDONLY); @@ -187,98 +289,107 @@ void do_search_file(const string &needle, const char *filename) exit(EXIT_FAILURE); } - // steady_clock::time_point start = steady_clock::now(); + steady_clock::time_point start __attribute__((unused)) = steady_clock::now(); if (access("/", R_OK | X_OK)) { // We can't find anything, no need to bother... return; } - Corpus corpus(fd); + IOUringEngine engine; + Corpus corpus(fd, &engine); + dprintf("Corpus init took %.1f ms.\n", 1e3 * duration(steady_clock::now() - start).count()); if (needle.size() < 3) { // Too short for trigram matching. Apply brute force. // (We could have searched through all trigrams that matched // the pattern and done a union of them, but that's a lot of // work for fairly unclear gain.) - unordered_map access_rx_cache; - uint32_t num_blocks = corpus.get_num_filename_blocks(); - for (uint32_t docid = 0; docid < num_blocks; ++docid) { - scan_docid(needle, docid, corpus, &access_rx_cache); - } + scan_all_docids(needle, fd, corpus, &engine); return; } - vector trigrams; + vector> trigrams; for (size_t i = 0; i < needle.size() - 2; ++i) { uint32_t trgm = read_trigram(needle, i); - const Trigram *trgmptr = corpus.find_trigram(trgm); - if (trgmptr == nullptr) { - dprintf("trigram %06x isn't found, we abort the search\n", trgm); - return; - } - trigrams.push_back(trgmptr); + pair range{ 0, corpus.num_trigrams - 1 }; + corpus.find_trigram(trgm, [trgm, &trigrams](const Trigram *trgmptr, size_t len) { + if (trgmptr == nullptr) { + dprintf("trigram %06x isn't found, we abort the search\n", trgm); + return; + } + trigrams.emplace_back(*trgmptr, len); + }); } + engine.finish(); + dprintf("Binary search took %.1f ms.\n", 1e3 * duration(steady_clock::now() - start).count()); + sort(trigrams.begin(), trigrams.end()); { auto last = unique(trigrams.begin(), trigrams.end()); trigrams.erase(last, trigrams.end()); } sort(trigrams.begin(), trigrams.end(), - [&](const Trigram *a, const Trigram *b) { - return a->num_docids < b->num_docids; + [&](const pair &a, const pair &b) { + return a.first.num_docids < b.first.num_docids; }); vector in1, in2, out; - for (const Trigram *trgmptr : trigrams) { - // uint32_t trgm = trgmptr->trgm; - size_t num = trgmptr->num_docids; - const unsigned char *pldata = corpus.get_compressed_posting_list(trgmptr); - if (in1.empty()) { - in1.resize(num + 128); - p4nd1dec128v32(const_cast(pldata), num, &in1[0]); - in1.resize(num); - dprintf("trigram '%c%c%c' decoded to %zu entries\n", trgm & 0xff, - (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num); - } else { - if (num > in1.size() * 100) { - dprintf("trigram '%c%c%c' has %zu entries, ignoring the rest (will " - "weed out false positives later)\n", - trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num); - break; - } + bool done = false; + for (auto [trgmptr, len] : trigrams) { + if (!in1.empty() && trgmptr.num_docids > in1.size() * 100) { + uint32_t trgm __attribute__((unused)) = trgmptr.trgm; + dprintf("trigram '%c%c%c' (%zu bytes) has %u entries, ignoring the rest (will " + "weed out false positives later)\n", + trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, + len, trgmptr.num_docids); + break; + } - if (in2.size() < num + 128) { - in2.resize(num + 128); - } - p4nd1dec128v32(const_cast(pldata), num, &in2[0]); - - out.clear(); - set_intersection(in1.begin(), in1.end(), in2.begin(), in2.begin() + num, - back_inserter(out)); - swap(in1, out); - dprintf("trigram '%c%c%c' decoded to %zu entries, %zu left\n", - trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num, - in1.size()); - if (in1.empty()) { - dprintf("no matches (intersection list is empty)\n"); + // Only stay a certain amount ahead, so that we don't spend I/O + // on reading the latter, large posting lists. We are unlikely + // to need them anyway, even if they should come in first. + if (engine.get_waiting_reads() >= 5) { + engine.finish(); + if (done) break; - } } + engine.submit_read(fd, len, trgmptr.offset, [trgmptr, len, &done, &in1, &in2, &out](string s) { + uint32_t trgm __attribute__((unused)) = trgmptr.trgm; + size_t num = trgmptr.num_docids; + unsigned char *pldata = reinterpret_cast(s.data()); + if (in1.empty()) { + in1.resize(num + 128); + p4nd1dec128v32(pldata, num, &in1[0]); + in1.resize(num); + dprintf("trigram '%c%c%c' (%zu bytes) decoded to %zu entries\n", trgm & 0xff, + (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, len, num); + } else { + if (in2.size() < num + 128) { + in2.resize(num + 128); + } + p4nd1dec128v32(pldata, num, &in2[0]); + + out.clear(); + set_intersection(in1.begin(), in1.end(), in2.begin(), in2.begin() + num, + back_inserter(out)); + swap(in1, out); + dprintf("trigram '%c%c%c' (%zu bytes) decoded to %zu entries, %zu left\n", + trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, + len, num, in1.size()); + if (in1.empty()) { + dprintf("no matches (intersection list is empty)\n"); + done = true; + } + } + }); } - steady_clock::time_point end = steady_clock::now(); - + engine.finish(); dprintf("Intersection took %.1f ms. Doing final verification and printing:\n", - 1e3 * duration(end - start).count()); - - unordered_map access_rx_cache; + 1e3 * duration(steady_clock::now() - start).count()); - int matched = 0; - for (uint32_t docid : in1) { - matched += scan_docid(needle, docid, corpus, &access_rx_cache); - } - end = steady_clock::now(); - dprintf("Done in %.1f ms, found %d matches.\n", - 1e3 * duration(end - start).count(), matched); + size_t matched __attribute__((unused)) = scan_docids(needle, in1, corpus, &engine); + dprintf("Done in %.1f ms, found %zu matches.\n", + 1e3 * duration(steady_clock::now() - start).count(), matched); } int main(int argc, char **argv) -- 2.39.2