--- /dev/null
+#include <string.h>
+#include <liburing.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <memory>
+#include <functional>
+
+#include "io_uring_engine.h"
+
+using namespace std;
+
+IOUringEngine::IOUringEngine()
+{
+ int ret = io_uring_queue_init(queue_depth, &ring, 0);
+ using_uring = (ret >= 0);
+}
+
+void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
+{
+ if (!using_uring) {
+ // Synchronous read.
+ string s;
+ s.resize(len);
+ complete_pread(fd, &s[0], len, offset);
+ cb(move(s));
+ return;
+ }
+
+ if (pending_reads < queue_depth) {
+ io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+ if (sqe == nullptr) {
+ fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+ exit(1);
+ }
+ submit_read_internal(sqe, fd, len, offset, move(cb));
+ } else {
+ queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
+ }
+}
+
+void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
+{
+ void *buf;
+ if (posix_memalign(&buf, /*alignment=*/4096, len)) {
+ fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
+ exit(1);
+ }
+ PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
+
+ io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
+ io_uring_sqe_set_data(sqe, pending);
+ ++pending_reads;
+}
+
+void IOUringEngine::finish()
+{
+ if (!using_uring) {
+ return;
+ }
+
+ int ret = io_uring_submit(&ring);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
+ exit(1);
+ }
+ bool anything_to_submit = false;
+ while (pending_reads > 0) {
+ io_uring_cqe *cqe;
+ ret = io_uring_wait_cqe(&ring, &cqe);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
+ exit(1);
+ }
+
+ PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
+ if (cqe->res <= 0) {
+ fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
+ exit(1);
+ }
+
+ if (size_t(cqe->res) < pending->iov.iov_len) {
+ // Incomplete read, so resubmit it.
+ pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
+ pending->iov.iov_len -= cqe->res;
+ pending->offset += cqe->res;
+ io_uring_cqe_seen(&ring, cqe);
+
+ io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+ if (sqe == nullptr) {
+ fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+ exit(1);
+ }
+ io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
+ io_uring_sqe_set_data(sqe, pending);
+ anything_to_submit = true;
+ } else {
+ io_uring_cqe_seen(&ring, cqe);
+ --pending_reads;
+
+ size_t old_pending_reads = pending_reads;
+ pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
+ free(pending->buf);
+ delete pending;
+
+ if (pending_reads != old_pending_reads) {
+ // A new read was made in the callback (and not queued),
+ // so we need to re-submit.
+ anything_to_submit = true;
+ }
+ }
+
+ // See if there are any queued reads we can submit now.
+ while (!queued_reads.empty() && pending_reads < queue_depth) {
+ io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+ if (sqe == nullptr) {
+ fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+ exit(1);
+ }
+ QueuedRead &qr = queued_reads.front();
+ submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
+ queued_reads.pop();
+ anything_to_submit = true;
+ }
+
+ if (anything_to_submit) {
+ // A new read was made, so we need to re-submit.
+ int ret = io_uring_submit(&ring);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
+ exit(1);
+ } else {
+ anything_to_submit = false;
+ }
+ }
+ }
+}
+
+void complete_pread(int fd, void *ptr, size_t len, off_t offset)
+{
+ while (len > 0) {
+ ssize_t ret = pread(fd, ptr, len, offset);
+ if (ret == -1 && errno == EINTR) {
+ continue;
+ }
+ if (ret <= 0) {
+ perror("pread");
+ exit(1);
+ }
+ ptr = reinterpret_cast<char *>(ptr) + ret;
+ len -= ret;
+ offset -= ret;
+ }
+}
+
+
#include "vp4.h"
+#include "io_uring_engine.h"
#include <algorithm>
#include <arpa/inet.h>
#include <chrono>
#include <endian.h>
#include <fcntl.h>
+#include <functional>
+#include <memory>
#include <stdio.h>
#include <string.h>
#include <string>
-#include <sys/mman.h>
#include <unistd.h>
#include <unordered_map>
#include <vector>
#define dprintf(...)
//#define dprintf(...) fprintf(stderr, __VA_ARGS__);
+class Serializer {
+public:
+ void do_or_wait(int seq, function<void()> cb);
+
+private:
+ int next_seq = 0;
+ struct Element {
+ int seq;
+ function<void()> cb;
+
+ bool operator<(const Element &other) const
+ {
+ return seq > other.seq;
+ }
+ };
+ priority_queue<Element> pending;
+};
+
+void Serializer::do_or_wait(int seq, function<void()> cb)
+{
+ if (seq != next_seq) {
+ pending.emplace(Element{ seq, move(cb) });
+ return;
+ }
+
+ cb();
+ ++next_seq;
+
+ while (!pending.empty() && pending.top().seq == next_seq) {
+ pending.top().cb();
+ pending.pop();
+ ++next_seq;
+ }
+}
+
static inline uint32_t read_unigram(const string &s, size_t idx)
{
if (idx < s.size()) {
uint32_t trgm;
uint32_t num_docids;
uint64_t offset;
+
+ bool operator==(const Trigram &other) const
+ {
+ return trgm == other.trgm;
+ }
+ bool operator<(const Trigram &other) const
+ {
+ return trgm < other.trgm;
+ }
};
class Corpus {
public:
- Corpus(int fd);
+ Corpus(int fd, IOUringEngine *engine);
~Corpus();
- const Trigram *find_trigram(uint32_t trgm) const;
- const unsigned char *
- get_compressed_posting_list(const Trigram *trigram) const;
- string_view get_compressed_filename_block(uint32_t docid) const;
+ void find_trigram(uint32_t trgm, function<void(const Trigram *trgmptr, size_t len)> cb);
+ void get_compressed_filename_block(uint32_t docid, function<void(string)> cb) const;
size_t get_num_filename_blocks() const;
+ off_t offset_for_block(uint32_t docid) const {
+ return filename_index_offset + docid * sizeof(uint64_t);
+ }
-private:
+public:
const int fd;
+ IOUringEngine *const engine;
+
off_t len;
- const char *data;
- const uint64_t *filename_offsets;
- const Trigram *trgm_begin, *trgm_end;
+ uint64_t filename_index_offset;
+
+ uint64_t num_trigrams;
+ const off_t trigram_offset = sizeof(uint64_t) * 2;
+
+ void binary_search_trigram(uint32_t trgm, uint32_t left, uint32_t right, function<void(const Trigram *trgmptr, size_t len)> cb);
};
-Corpus::Corpus(int fd)
- : fd(fd)
+Corpus::Corpus(int fd, IOUringEngine *engine)
+ : fd(fd), engine(engine)
{
len = lseek(fd, 0, SEEK_END);
if (len == -1) {
perror("lseek");
exit(1);
}
- data = (char *)mmap(nullptr, len, PROT_READ, MAP_SHARED, fd, /*offset=*/0);
- if (data == MAP_FAILED) {
- perror("mmap");
- exit(1);
- }
- uint64_t num_trigrams = *(const uint64_t *)data;
- uint64_t filename_index_offset = *(const uint64_t *)(data + sizeof(uint64_t));
- filename_offsets = (const uint64_t *)(data + filename_index_offset);
+ // Uncomment to test cold-cache behavior (except for access()).
+ // posix_fadvise(fd, 0, len, POSIX_FADV_DONTNEED);
- trgm_begin = (Trigram *)(data + sizeof(uint64_t) * 2);
- trgm_end = trgm_begin + num_trigrams;
+ uint64_t vals[2];
+ complete_pread(fd, vals, sizeof(vals), /*offset=*/0);
+
+ num_trigrams = vals[0];
+ filename_index_offset = vals[1];
}
Corpus::~Corpus()
{
- munmap((void *)data, len);
close(fd);
}
-const Trigram *Corpus::find_trigram(uint32_t trgm) const
+void Corpus::find_trigram(uint32_t trgm, function<void(const Trigram *trgmptr, size_t len)> cb)
{
- const Trigram *trgmptr = lower_bound(
- trgm_begin, trgm_end, trgm,
- [](const Trigram &trgm, uint32_t t) { return trgm.trgm < t; });
- if (trgmptr == trgm_end || trgmptr->trgm != trgm) {
- return nullptr;
- }
- return trgmptr;
+ binary_search_trigram(trgm, 0, num_trigrams - 1, move(cb));
}
-const unsigned char *
-Corpus::get_compressed_posting_list(const Trigram *trgmptr) const
+void Corpus::binary_search_trigram(uint32_t trgm, uint32_t left, uint32_t right, function<void(const Trigram *trgmptr, size_t len)> cb)
{
- return reinterpret_cast<const unsigned char *>(data + trgmptr->offset);
+ if (left > right) {
+ cb(nullptr, 0);
+ return;
+ }
+ uint32_t mid = (left + right) / 2;
+ engine->submit_read(fd, sizeof(Trigram) * 2, trigram_offset + sizeof(Trigram) * mid, [this, trgm, left, mid, right, cb{ move(cb) }](string s) {
+ const Trigram *trgmptr = reinterpret_cast<const Trigram *>(s.data());
+ const Trigram *next_trgmptr = trgmptr + 1;
+ if (trgmptr->trgm < trgm) {
+ binary_search_trigram(trgm, mid + 1, right, move(cb));
+ } else if (trgmptr->trgm > trgm) {
+ binary_search_trigram(trgm, left, mid - 1, move(cb));
+ } else {
+ cb(trgmptr, next_trgmptr->offset - trgmptr->offset);
+ }
+ });
}
-string_view Corpus::get_compressed_filename_block(uint32_t docid) const
+void Corpus::get_compressed_filename_block(uint32_t docid, function<void(string)> cb) const
{
- const char *compressed = (const char *)(data + filename_offsets[docid]);
- size_t compressed_size =
- filename_offsets[docid + 1] -
- filename_offsets[docid]; // Allowed we have a sentinel block at the end.
- return { compressed, compressed_size };
+ // Read the file offset from this docid and the next one.
+ // This is always allowed, since we have a sentinel block at the end.
+ engine->submit_read(fd, sizeof(uint64_t) * 2, offset_for_block(docid), [this, cb{ move(cb) }](string s) {
+ const uint64_t *ptr = reinterpret_cast<const uint64_t *>(s.data());
+ off_t offset = ptr[0];
+ size_t len = ptr[1] - ptr[0];
+ engine->submit_read(fd, len, offset, cb);
+ });
}
size_t Corpus::get_num_filename_blocks() const
{
// The beginning of the filename blocks is the end of the filename index blocks.
- const uint64_t *filename_offsets_end = (const uint64_t *)(data + filename_offsets[0]);
+ uint64_t end;
+ complete_pread(fd, &end, sizeof(end), filename_index_offset);
// Subtract the sentinel block.
- return filename_offsets_end - filename_offsets - 1;
+ return (end - filename_index_offset) / sizeof(uint64_t) - 1;
}
-size_t scan_docid(const string &needle, uint32_t docid, const Corpus &corpus,
- unordered_map<string, bool> *access_rx_cache)
+size_t scan_file_block(const string &needle, string_view compressed,
+ unordered_map<string, bool> *access_rx_cache)
{
- string_view compressed = corpus.get_compressed_filename_block(docid);
size_t matched = 0;
string block;
return matched;
}
+size_t scan_docids(const string &needle, const vector<uint32_t> &docids, const Corpus &corpus, IOUringEngine *engine)
+{
+ Serializer docids_in_order;
+ unordered_map<string, bool> access_rx_cache;
+ size_t matched = 0;
+ for (size_t i = 0; i < docids.size(); ++i) {
+ uint32_t docid = docids[i];
+ corpus.get_compressed_filename_block(docid, [i, &matched, &needle, &access_rx_cache, &docids_in_order](string compressed) {
+ docids_in_order.do_or_wait(i, [&matched, &needle, compressed{ move(compressed) }, &access_rx_cache] {
+ matched += scan_file_block(needle, compressed, &access_rx_cache);
+ });
+ });
+ }
+ engine->finish();
+ return matched;
+}
+
+// We do this sequentially, as it's faster than scattering
+// a lot of I/O through io_uring and hoping the kernel will
+// coalesce it plus readahead for us.
+void scan_all_docids(const string &needle, int fd, const Corpus &corpus, IOUringEngine *engine)
+{
+ unordered_map<string, bool> access_rx_cache;
+ uint32_t num_blocks = corpus.get_num_filename_blocks();
+ unique_ptr<uint64_t[]> offsets(new uint64_t[num_blocks + 1]);
+ complete_pread(fd, offsets.get(), (num_blocks + 1) * sizeof(uint64_t), corpus.offset_for_block(0));
+ string compressed;
+ for (uint32_t io_docid = 0; io_docid < num_blocks; io_docid += 32) {
+ uint32_t last_docid = std::min(io_docid + 32, num_blocks);
+ size_t io_len = offsets[last_docid] - offsets[io_docid];
+ if (compressed.size() < io_len) {
+ compressed.resize(io_len);
+ }
+ complete_pread(fd, &compressed[0], io_len, offsets[io_docid]);
+
+ for (uint32_t docid = io_docid; docid < last_docid; ++docid) {
+ size_t relative_offset = offsets[docid] - offsets[io_docid];
+ size_t len = offsets[docid + 1] - offsets[docid];
+ scan_file_block(needle, {&compressed[relative_offset], len}, &access_rx_cache);
+ }
+ }
+}
+
void do_search_file(const string &needle, const char *filename)
{
int fd = open(filename, O_RDONLY);
exit(EXIT_FAILURE);
}
- // steady_clock::time_point start = steady_clock::now();
+ steady_clock::time_point start __attribute__((unused)) = steady_clock::now();
if (access("/", R_OK | X_OK)) {
// We can't find anything, no need to bother...
return;
}
- Corpus corpus(fd);
+ IOUringEngine engine;
+ Corpus corpus(fd, &engine);
+ dprintf("Corpus init took %.1f ms.\n", 1e3 * duration<float>(steady_clock::now() - start).count());
if (needle.size() < 3) {
// Too short for trigram matching. Apply brute force.
// (We could have searched through all trigrams that matched
// the pattern and done a union of them, but that's a lot of
// work for fairly unclear gain.)
- unordered_map<string, bool> access_rx_cache;
- uint32_t num_blocks = corpus.get_num_filename_blocks();
- for (uint32_t docid = 0; docid < num_blocks; ++docid) {
- scan_docid(needle, docid, corpus, &access_rx_cache);
- }
+ scan_all_docids(needle, fd, corpus, &engine);
return;
}
- vector<const Trigram *> trigrams;
+ vector<pair<Trigram, size_t>> trigrams;
for (size_t i = 0; i < needle.size() - 2; ++i) {
uint32_t trgm = read_trigram(needle, i);
- const Trigram *trgmptr = corpus.find_trigram(trgm);
- if (trgmptr == nullptr) {
- dprintf("trigram %06x isn't found, we abort the search\n", trgm);
- return;
- }
- trigrams.push_back(trgmptr);
+ pair<uint32_t, uint32_t> range{ 0, corpus.num_trigrams - 1 };
+ corpus.find_trigram(trgm, [trgm, &trigrams](const Trigram *trgmptr, size_t len) {
+ if (trgmptr == nullptr) {
+ dprintf("trigram %06x isn't found, we abort the search\n", trgm);
+ return;
+ }
+ trigrams.emplace_back(*trgmptr, len);
+ });
}
+ engine.finish();
+ dprintf("Binary search took %.1f ms.\n", 1e3 * duration<float>(steady_clock::now() - start).count());
+
sort(trigrams.begin(), trigrams.end());
{
auto last = unique(trigrams.begin(), trigrams.end());
trigrams.erase(last, trigrams.end());
}
sort(trigrams.begin(), trigrams.end(),
- [&](const Trigram *a, const Trigram *b) {
- return a->num_docids < b->num_docids;
+ [&](const pair<Trigram, size_t> &a, const pair<Trigram, size_t> &b) {
+ return a.first.num_docids < b.first.num_docids;
});
vector<uint32_t> in1, in2, out;
- for (const Trigram *trgmptr : trigrams) {
- // uint32_t trgm = trgmptr->trgm;
- size_t num = trgmptr->num_docids;
- const unsigned char *pldata = corpus.get_compressed_posting_list(trgmptr);
- if (in1.empty()) {
- in1.resize(num + 128);
- p4nd1dec128v32(const_cast<unsigned char *>(pldata), num, &in1[0]);
- in1.resize(num);
- dprintf("trigram '%c%c%c' decoded to %zu entries\n", trgm & 0xff,
- (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num);
- } else {
- if (num > in1.size() * 100) {
- dprintf("trigram '%c%c%c' has %zu entries, ignoring the rest (will "
- "weed out false positives later)\n",
- trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num);
- break;
- }
+ bool done = false;
+ for (auto [trgmptr, len] : trigrams) {
+ if (!in1.empty() && trgmptr.num_docids > in1.size() * 100) {
+ uint32_t trgm __attribute__((unused)) = trgmptr.trgm;
+ dprintf("trigram '%c%c%c' (%zu bytes) has %u entries, ignoring the rest (will "
+ "weed out false positives later)\n",
+ trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff,
+ len, trgmptr.num_docids);
+ break;
+ }
- if (in2.size() < num + 128) {
- in2.resize(num + 128);
- }
- p4nd1dec128v32(const_cast<unsigned char *>(pldata), num, &in2[0]);
-
- out.clear();
- set_intersection(in1.begin(), in1.end(), in2.begin(), in2.begin() + num,
- back_inserter(out));
- swap(in1, out);
- dprintf("trigram '%c%c%c' decoded to %zu entries, %zu left\n",
- trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num,
- in1.size());
- if (in1.empty()) {
- dprintf("no matches (intersection list is empty)\n");
+ // Only stay a certain amount ahead, so that we don't spend I/O
+ // on reading the latter, large posting lists. We are unlikely
+ // to need them anyway, even if they should come in first.
+ if (engine.get_waiting_reads() >= 5) {
+ engine.finish();
+ if (done)
break;
- }
}
+ engine.submit_read(fd, len, trgmptr.offset, [trgmptr, len, &done, &in1, &in2, &out](string s) {
+ uint32_t trgm __attribute__((unused)) = trgmptr.trgm;
+ size_t num = trgmptr.num_docids;
+ unsigned char *pldata = reinterpret_cast<unsigned char *>(s.data());
+ if (in1.empty()) {
+ in1.resize(num + 128);
+ p4nd1dec128v32(pldata, num, &in1[0]);
+ in1.resize(num);
+ dprintf("trigram '%c%c%c' (%zu bytes) decoded to %zu entries\n", trgm & 0xff,
+ (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, len, num);
+ } else {
+ if (in2.size() < num + 128) {
+ in2.resize(num + 128);
+ }
+ p4nd1dec128v32(pldata, num, &in2[0]);
+
+ out.clear();
+ set_intersection(in1.begin(), in1.end(), in2.begin(), in2.begin() + num,
+ back_inserter(out));
+ swap(in1, out);
+ dprintf("trigram '%c%c%c' (%zu bytes) decoded to %zu entries, %zu left\n",
+ trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff,
+ len, num, in1.size());
+ if (in1.empty()) {
+ dprintf("no matches (intersection list is empty)\n");
+ done = true;
+ }
+ }
+ });
}
- steady_clock::time_point end = steady_clock::now();
-
+ engine.finish();
dprintf("Intersection took %.1f ms. Doing final verification and printing:\n",
- 1e3 * duration<float>(end - start).count());
-
- unordered_map<string, bool> access_rx_cache;
+ 1e3 * duration<float>(steady_clock::now() - start).count());
- int matched = 0;
- for (uint32_t docid : in1) {
- matched += scan_docid(needle, docid, corpus, &access_rx_cache);
- }
- end = steady_clock::now();
- dprintf("Done in %.1f ms, found %d matches.\n",
- 1e3 * duration<float>(end - start).count(), matched);
+ size_t matched __attribute__((unused)) = scan_docids(needle, in1, corpus, &engine);
+ dprintf("Done in %.1f ms, found %zu matches.\n",
+ 1e3 * duration<float>(steady_clock::now() - start).count(), matched);
}
int main(int argc, char **argv)