]> git.sesse.net Git - plocate/commitdiff
Replace mmap with io_uring.
authorSteinar H. Gunderson <steinar+git@gunderson.no>
Wed, 30 Sep 2020 08:20:10 +0000 (10:20 +0200)
committerSteinar H. Gunderson <steinar+git@gunderson.no>
Wed, 30 Sep 2020 08:28:27 +0000 (10:28 +0200)
This moves to explicit, asynchronous I/O through io_uring (Linux 5.1+),
which speeds up cold-cache behavior on rotating media by 3x or so.
It also removes any issues we might have with not fitting into 32-bit
address spaces.

If io_uring is not available, regular synchronous I/O will be used instead.
For now, there's a dependency on liburing, but it will be optional soon
for older systems.

Makefile
io_uring_engine.cpp [new file with mode: 0644]
io_uring_engine.h [new file with mode: 0644]
plocate.cpp

index ccbaf1547910ed4a935c9adf37f1063793a0591d..91e8b178ac7b3540aab5f5f0140fdf72c64d6837 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -7,8 +7,8 @@ PREFIX ?= /usr/local
 
 all: plocate plocate-build
 
-plocate: plocate.o TurboPFor-Integer-Compression/libic.a
-       $(CXX) -o $@ $^ -lzstd
+plocate: plocate.o io_uring_engine.o TurboPFor-Integer-Compression/libic.a
+       $(CXX) -o $@ $^ -lzstd $(shell pkg-config --libs liburing)
 
 plocate-build: plocate-build.o TurboPFor-Integer-Compression/libic.a
        $(CXX) -o $@ $^ -lzstd
diff --git a/io_uring_engine.cpp b/io_uring_engine.cpp
new file mode 100644 (file)
index 0000000..b10639d
--- /dev/null
@@ -0,0 +1,155 @@
+#include <string.h>
+#include <liburing.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <memory>
+#include <functional>
+
+#include "io_uring_engine.h"
+
+using namespace std;
+
+IOUringEngine::IOUringEngine()
+{
+       int ret = io_uring_queue_init(queue_depth, &ring, 0);
+       using_uring = (ret >= 0);
+}
+
+void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
+{
+       if (!using_uring) {
+               // Synchronous read.
+               string s;
+               s.resize(len);
+               complete_pread(fd, &s[0], len, offset);
+               cb(move(s));
+               return;
+       }
+
+       if (pending_reads < queue_depth) {
+               io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+               if (sqe == nullptr) {
+                       fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+                       exit(1);
+               }
+               submit_read_internal(sqe, fd, len, offset, move(cb));
+       } else {
+               queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
+       }
+}
+
+void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
+{
+       void *buf;
+       if (posix_memalign(&buf, /*alignment=*/4096, len)) {
+               fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
+               exit(1);
+       }
+       PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
+
+       io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
+       io_uring_sqe_set_data(sqe, pending);
+       ++pending_reads;
+}
+
+void IOUringEngine::finish()
+{
+       if (!using_uring) {
+               return;
+       }
+
+       int ret = io_uring_submit(&ring);
+       if (ret < 0) {
+               fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
+               exit(1);
+       }
+       bool anything_to_submit = false;
+       while (pending_reads > 0) {
+               io_uring_cqe *cqe;
+               ret = io_uring_wait_cqe(&ring, &cqe);
+               if (ret < 0) {
+                       fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
+                       exit(1);
+               }
+
+               PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
+               if (cqe->res <= 0) {
+                       fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
+                       exit(1);
+               }
+
+               if (size_t(cqe->res) < pending->iov.iov_len) {
+                       // Incomplete read, so resubmit it.
+                       pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
+                       pending->iov.iov_len -= cqe->res;
+                       pending->offset += cqe->res;
+                       io_uring_cqe_seen(&ring, cqe);
+
+                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                       if (sqe == nullptr) {
+                               fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+                               exit(1);
+                       }
+                       io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
+                       io_uring_sqe_set_data(sqe, pending);
+                       anything_to_submit = true;
+               } else {
+                       io_uring_cqe_seen(&ring, cqe);
+                       --pending_reads;
+
+                       size_t old_pending_reads = pending_reads;
+                       pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
+                       free(pending->buf);
+                       delete pending;
+
+                       if (pending_reads != old_pending_reads) {
+                               // A new read was made in the callback (and not queued),
+                               // so we need to re-submit.
+                               anything_to_submit = true;
+                       }
+               }
+
+               // See if there are any queued reads we can submit now.
+               while (!queued_reads.empty() && pending_reads < queue_depth) {
+                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                       if (sqe == nullptr) {
+                               fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+                               exit(1);
+                       }
+                       QueuedRead &qr = queued_reads.front();
+                       submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
+                       queued_reads.pop();
+                       anything_to_submit = true;
+               }
+
+               if (anything_to_submit) {
+                       // A new read was made, so we need to re-submit.
+                       int ret = io_uring_submit(&ring);
+                       if (ret < 0) {
+                               fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
+                               exit(1);
+                       } else {
+                               anything_to_submit = false;
+                       }
+               }
+       }
+}
+
+void complete_pread(int fd, void *ptr, size_t len, off_t offset)
+{
+       while (len > 0) {
+               ssize_t ret = pread(fd, ptr, len, offset);
+               if (ret == -1 && errno == EINTR) {
+                       continue;
+               }
+               if (ret <= 0) {
+                       perror("pread");
+                       exit(1);
+               }
+               ptr = reinterpret_cast<char *>(ptr) + ret;
+               len -= ret;
+               offset -= ret;
+       }
+}
+
+
diff --git a/io_uring_engine.h b/io_uring_engine.h
new file mode 100644 (file)
index 0000000..93e7a0d
--- /dev/null
@@ -0,0 +1,51 @@
+#ifndef IO_URING_ENGINE_H
+#define IO_URING_ENGINE_H 1
+
+#include <functional>
+#include <queue>
+#include <string>
+#include <stdint.h>
+#include <liburing.h>
+
+class IOUringEngine {
+public:
+       IOUringEngine();
+       void submit_read(int fd, size_t len, off_t offset, std::function<void(std::string)> cb);
+       void finish();
+       size_t get_waiting_reads() const { return pending_reads + queued_reads.size(); }
+
+private:
+       void submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, std::function<void(std::string)> cb);
+
+       io_uring ring;
+       size_t pending_reads = 0;  // Number of requests we have going in the ring.
+       bool using_uring;
+
+       struct QueuedRead {
+               int fd;
+               size_t len;
+               off_t offset;
+               std::function<void(std::string)> cb;
+       };
+       std::queue<QueuedRead> queued_reads;
+
+       struct PendingRead {
+               void *buf;
+               size_t len;
+               std::function<void(std::string)> cb;
+
+               // For re-submission.
+               int fd;
+               off_t offset;
+               iovec iov;
+       };
+
+       // 256 simultaneous requests should be ample, for slow and fast media alike.
+       static constexpr size_t queue_depth = 256;
+};
+
+// A wrapper around pread() that returns an incomplete read.
+// Always synchronous (no io_uring).
+void complete_pread(int fd, void *ptr, size_t len, off_t offset);
+
+#endif  // !defined(IO_URING_ENGINE_H)
index 4b7fbc15c9b4d47f790230854c3bfb971f0b439c..f461d0b32fed38c978bfd3a9eeafbe9418c8b2ab 100644 (file)
@@ -1,14 +1,16 @@
 #include "vp4.h"
+#include "io_uring_engine.h"
 
 #include <algorithm>
 #include <arpa/inet.h>
 #include <chrono>
 #include <endian.h>
 #include <fcntl.h>
+#include <functional>
+#include <memory>
 #include <stdio.h>
 #include <string.h>
 #include <string>
-#include <sys/mman.h>
 #include <unistd.h>
 #include <unordered_map>
 #include <vector>
@@ -20,6 +22,41 @@ using namespace std::chrono;
 #define dprintf(...)
 //#define dprintf(...) fprintf(stderr, __VA_ARGS__);
 
+class Serializer {
+public:
+       void do_or_wait(int seq, function<void()> cb);
+
+private:
+       int next_seq = 0;
+       struct Element {
+               int seq;
+               function<void()> cb;
+
+               bool operator<(const Element &other) const
+               {
+                       return seq > other.seq;
+               }
+       };
+       priority_queue<Element> pending;
+};
+
+void Serializer::do_or_wait(int seq, function<void()> cb)
+{
+       if (seq != next_seq) {
+               pending.emplace(Element{ seq, move(cb) });
+               return;
+       }
+
+       cb();
+       ++next_seq;
+
+       while (!pending.empty() && pending.top().seq == next_seq) {
+               pending.top().cb();
+               pending.pop();
+               ++next_seq;
+       }
+}
+
 static inline uint32_t read_unigram(const string &s, size_t idx)
 {
        if (idx < s.size()) {
@@ -62,93 +99,115 @@ struct Trigram {
        uint32_t trgm;
        uint32_t num_docids;
        uint64_t offset;
+
+       bool operator==(const Trigram &other) const
+       {
+               return trgm == other.trgm;
+       }
+       bool operator<(const Trigram &other) const
+       {
+               return trgm < other.trgm;
+       }
 };
 
 class Corpus {
 public:
-       Corpus(int fd);
+       Corpus(int fd, IOUringEngine *engine);
        ~Corpus();
-       const Trigram *find_trigram(uint32_t trgm) const;
-       const unsigned char *
-       get_compressed_posting_list(const Trigram *trigram) const;
-       string_view get_compressed_filename_block(uint32_t docid) const;
+       void find_trigram(uint32_t trgm, function<void(const Trigram *trgmptr, size_t len)> cb);
+       void get_compressed_filename_block(uint32_t docid, function<void(string)> cb) const;
        size_t get_num_filename_blocks() const;
+       off_t offset_for_block(uint32_t docid) const {
+               return filename_index_offset + docid * sizeof(uint64_t);
+       }
 
-private:
+public:
        const int fd;
+       IOUringEngine *const engine;
+
        off_t len;
-       const char *data;
-       const uint64_t *filename_offsets;
-       const Trigram *trgm_begin, *trgm_end;
+       uint64_t filename_index_offset;
+
+       uint64_t num_trigrams;
+       const off_t trigram_offset = sizeof(uint64_t) * 2;
+
+       void binary_search_trigram(uint32_t trgm, uint32_t left, uint32_t right, function<void(const Trigram *trgmptr, size_t len)> cb);
 };
 
-Corpus::Corpus(int fd)
-       : fd(fd)
+Corpus::Corpus(int fd, IOUringEngine *engine)
+       : fd(fd), engine(engine)
 {
        len = lseek(fd, 0, SEEK_END);
        if (len == -1) {
                perror("lseek");
                exit(1);
        }
-       data = (char *)mmap(nullptr, len, PROT_READ, MAP_SHARED, fd, /*offset=*/0);
-       if (data == MAP_FAILED) {
-               perror("mmap");
-               exit(1);
-       }
 
-       uint64_t num_trigrams = *(const uint64_t *)data;
-       uint64_t filename_index_offset = *(const uint64_t *)(data + sizeof(uint64_t));
-       filename_offsets = (const uint64_t *)(data + filename_index_offset);
+       // Uncomment to test cold-cache behavior (except for access()).
+       // posix_fadvise(fd, 0, len, POSIX_FADV_DONTNEED);
 
-       trgm_begin = (Trigram *)(data + sizeof(uint64_t) * 2);
-       trgm_end = trgm_begin + num_trigrams;
+       uint64_t vals[2];
+       complete_pread(fd, vals, sizeof(vals), /*offset=*/0);
+
+       num_trigrams = vals[0];
+       filename_index_offset = vals[1];
 }
 
 Corpus::~Corpus()
 {
-       munmap((void *)data, len);
        close(fd);
 }
 
-const Trigram *Corpus::find_trigram(uint32_t trgm) const
+void Corpus::find_trigram(uint32_t trgm, function<void(const Trigram *trgmptr, size_t len)> cb)
 {
-       const Trigram *trgmptr = lower_bound(
-               trgm_begin, trgm_end, trgm,
-               [](const Trigram &trgm, uint32_t t) { return trgm.trgm < t; });
-       if (trgmptr == trgm_end || trgmptr->trgm != trgm) {
-               return nullptr;
-       }
-       return trgmptr;
+       binary_search_trigram(trgm, 0, num_trigrams - 1, move(cb));
 }
 
-const unsigned char *
-Corpus::get_compressed_posting_list(const Trigram *trgmptr) const
+void Corpus::binary_search_trigram(uint32_t trgm, uint32_t left, uint32_t right, function<void(const Trigram *trgmptr, size_t len)> cb)
 {
-       return reinterpret_cast<const unsigned char *>(data + trgmptr->offset);
+       if (left > right) {
+               cb(nullptr, 0);
+               return;
+       }
+       uint32_t mid = (left + right) / 2;
+       engine->submit_read(fd, sizeof(Trigram) * 2, trigram_offset + sizeof(Trigram) * mid, [this, trgm, left, mid, right, cb{ move(cb) }](string s) {
+               const Trigram *trgmptr = reinterpret_cast<const Trigram *>(s.data());
+               const Trigram *next_trgmptr = trgmptr + 1;
+               if (trgmptr->trgm < trgm) {
+                       binary_search_trigram(trgm, mid + 1, right, move(cb));
+               } else if (trgmptr->trgm > trgm) {
+                       binary_search_trigram(trgm, left, mid - 1, move(cb));
+               } else {
+                       cb(trgmptr, next_trgmptr->offset - trgmptr->offset);
+               }
+       });
 }
 
-string_view Corpus::get_compressed_filename_block(uint32_t docid) const
+void Corpus::get_compressed_filename_block(uint32_t docid, function<void(string)> cb) const
 {
-       const char *compressed = (const char *)(data + filename_offsets[docid]);
-       size_t compressed_size =
-               filename_offsets[docid + 1] -
-               filename_offsets[docid];  // Allowed we have a sentinel block at the end.
-       return { compressed, compressed_size };
+       // Read the file offset from this docid and the next one.
+       // This is always allowed, since we have a sentinel block at the end.
+       engine->submit_read(fd, sizeof(uint64_t) * 2, offset_for_block(docid), [this, cb{ move(cb) }](string s) {
+               const uint64_t *ptr = reinterpret_cast<const uint64_t *>(s.data());
+               off_t offset = ptr[0];
+               size_t len = ptr[1] - ptr[0];
+               engine->submit_read(fd, len, offset, cb);
+       });
 }
 
 size_t Corpus::get_num_filename_blocks() const
 {
        // The beginning of the filename blocks is the end of the filename index blocks.
-       const uint64_t *filename_offsets_end = (const uint64_t *)(data + filename_offsets[0]);
+       uint64_t end;
+       complete_pread(fd, &end, sizeof(end), filename_index_offset);
 
        // Subtract the sentinel block.
-       return filename_offsets_end - filename_offsets - 1;
+       return (end - filename_index_offset) / sizeof(uint64_t) - 1;
 }
 
-size_t scan_docid(const string &needle, uint32_t docid, const Corpus &corpus,
-                  unordered_map<string, bool> *access_rx_cache)
+size_t scan_file_block(const string &needle, string_view compressed,
+                       unordered_map<string, bool> *access_rx_cache)
 {
-       string_view compressed = corpus.get_compressed_filename_block(docid);
        size_t matched = 0;
 
        string block;
@@ -173,6 +232,49 @@ size_t scan_docid(const string &needle, uint32_t docid, const Corpus &corpus,
        return matched;
 }
 
+size_t scan_docids(const string &needle, const vector<uint32_t> &docids, const Corpus &corpus, IOUringEngine *engine)
+{
+       Serializer docids_in_order;
+       unordered_map<string, bool> access_rx_cache;
+       size_t matched = 0;
+       for (size_t i = 0; i < docids.size(); ++i) {
+               uint32_t docid = docids[i];
+               corpus.get_compressed_filename_block(docid, [i, &matched, &needle, &access_rx_cache, &docids_in_order](string compressed) {
+                       docids_in_order.do_or_wait(i, [&matched, &needle, compressed{ move(compressed) }, &access_rx_cache] {
+                               matched += scan_file_block(needle, compressed, &access_rx_cache);
+                       });
+               });
+       }
+       engine->finish();
+       return matched;
+}
+
+// We do this sequentially, as it's faster than scattering
+// a lot of I/O through io_uring and hoping the kernel will
+// coalesce it plus readahead for us.
+void scan_all_docids(const string &needle, int fd, const Corpus &corpus, IOUringEngine *engine)
+{
+       unordered_map<string, bool> access_rx_cache;
+       uint32_t num_blocks = corpus.get_num_filename_blocks();
+       unique_ptr<uint64_t[]> offsets(new uint64_t[num_blocks + 1]);
+       complete_pread(fd, offsets.get(), (num_blocks + 1) * sizeof(uint64_t), corpus.offset_for_block(0));
+       string compressed;
+       for (uint32_t io_docid = 0; io_docid < num_blocks; io_docid += 32) {
+               uint32_t last_docid = std::min(io_docid + 32, num_blocks);
+               size_t io_len = offsets[last_docid] - offsets[io_docid];
+               if (compressed.size() < io_len) {
+                       compressed.resize(io_len);
+               }
+               complete_pread(fd, &compressed[0], io_len, offsets[io_docid]);
+
+               for (uint32_t docid = io_docid; docid < last_docid; ++docid) {
+                       size_t relative_offset = offsets[docid] - offsets[io_docid];
+                       size_t len = offsets[docid + 1] - offsets[docid];
+                       scan_file_block(needle, {&compressed[relative_offset], len}, &access_rx_cache);
+               }
+       }
+}
+
 void do_search_file(const string &needle, const char *filename)
 {
        int fd = open(filename, O_RDONLY);
@@ -187,98 +289,107 @@ void do_search_file(const string &needle, const char *filename)
                exit(EXIT_FAILURE);
        }
 
-       // steady_clock::time_point start = steady_clock::now();
+       steady_clock::time_point start __attribute__((unused)) = steady_clock::now();
        if (access("/", R_OK | X_OK)) {
                // We can't find anything, no need to bother...
                return;
        }
 
-       Corpus corpus(fd);
+       IOUringEngine engine;
+       Corpus corpus(fd, &engine);
+       dprintf("Corpus init took %.1f ms.\n", 1e3 * duration<float>(steady_clock::now() - start).count());
 
        if (needle.size() < 3) {
                // Too short for trigram matching. Apply brute force.
                // (We could have searched through all trigrams that matched
                // the pattern and done a union of them, but that's a lot of
                // work for fairly unclear gain.)
-               unordered_map<string, bool> access_rx_cache;
-               uint32_t num_blocks = corpus.get_num_filename_blocks();
-               for (uint32_t docid = 0; docid < num_blocks; ++docid) {
-                       scan_docid(needle, docid, corpus, &access_rx_cache);
-               }
+               scan_all_docids(needle, fd, corpus, &engine);
                return;
        }
 
-       vector<const Trigram *> trigrams;
+       vector<pair<Trigram, size_t>> trigrams;
        for (size_t i = 0; i < needle.size() - 2; ++i) {
                uint32_t trgm = read_trigram(needle, i);
-               const Trigram *trgmptr = corpus.find_trigram(trgm);
-               if (trgmptr == nullptr) {
-                       dprintf("trigram %06x isn't found, we abort the search\n", trgm);
-                       return;
-               }
-               trigrams.push_back(trgmptr);
+               pair<uint32_t, uint32_t> range{ 0, corpus.num_trigrams - 1 };
+               corpus.find_trigram(trgm, [trgm, &trigrams](const Trigram *trgmptr, size_t len) {
+                       if (trgmptr == nullptr) {
+                               dprintf("trigram %06x isn't found, we abort the search\n", trgm);
+                               return;
+                       }
+                       trigrams.emplace_back(*trgmptr, len);
+               });
        }
+       engine.finish();
+       dprintf("Binary search took %.1f ms.\n", 1e3 * duration<float>(steady_clock::now() - start).count());
+
        sort(trigrams.begin(), trigrams.end());
        {
                auto last = unique(trigrams.begin(), trigrams.end());
                trigrams.erase(last, trigrams.end());
        }
        sort(trigrams.begin(), trigrams.end(),
-            [&](const Trigram *a, const Trigram *b) {
-                    return a->num_docids < b->num_docids;
+            [&](const pair<Trigram, size_t> &a, const pair<Trigram, size_t> &b) {
+                    return a.first.num_docids < b.first.num_docids;
             });
 
        vector<uint32_t> in1, in2, out;
-       for (const Trigram *trgmptr : trigrams) {
-               // uint32_t trgm = trgmptr->trgm;
-               size_t num = trgmptr->num_docids;
-               const unsigned char *pldata = corpus.get_compressed_posting_list(trgmptr);
-               if (in1.empty()) {
-                       in1.resize(num + 128);
-                       p4nd1dec128v32(const_cast<unsigned char *>(pldata), num, &in1[0]);
-                       in1.resize(num);
-                       dprintf("trigram '%c%c%c' decoded to %zu entries\n", trgm & 0xff,
-                               (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num);
-               } else {
-                       if (num > in1.size() * 100) {
-                               dprintf("trigram '%c%c%c' has %zu entries, ignoring the rest (will "
-                                       "weed out false positives later)\n",
-                                       trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num);
-                               break;
-                       }
+       bool done = false;
+       for (auto [trgmptr, len] : trigrams) {
+               if (!in1.empty() && trgmptr.num_docids > in1.size() * 100) {
+                       uint32_t trgm __attribute__((unused)) = trgmptr.trgm;
+                       dprintf("trigram '%c%c%c' (%zu bytes) has %u entries, ignoring the rest (will "
+                               "weed out false positives later)\n",
+                               trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff,
+                               len, trgmptr.num_docids);
+                       break;
+               }
 
-                       if (in2.size() < num + 128) {
-                               in2.resize(num + 128);
-                       }
-                       p4nd1dec128v32(const_cast<unsigned char *>(pldata), num, &in2[0]);
-
-                       out.clear();
-                       set_intersection(in1.begin(), in1.end(), in2.begin(), in2.begin() + num,
-                                        back_inserter(out));
-                       swap(in1, out);
-                       dprintf("trigram '%c%c%c' decoded to %zu entries, %zu left\n",
-                               trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, num,
-                               in1.size());
-                       if (in1.empty()) {
-                               dprintf("no matches (intersection list is empty)\n");
+               // Only stay a certain amount ahead, so that we don't spend I/O
+               // on reading the latter, large posting lists. We are unlikely
+               // to need them anyway, even if they should come in first.
+               if (engine.get_waiting_reads() >= 5) {
+                       engine.finish();
+                       if (done)
                                break;
-                       }
                }
+               engine.submit_read(fd, len, trgmptr.offset, [trgmptr, len, &done, &in1, &in2, &out](string s) {
+                       uint32_t trgm __attribute__((unused)) = trgmptr.trgm;
+                       size_t num = trgmptr.num_docids;
+                       unsigned char *pldata = reinterpret_cast<unsigned char *>(s.data());
+                       if (in1.empty()) {
+                               in1.resize(num + 128);
+                               p4nd1dec128v32(pldata, num, &in1[0]);
+                               in1.resize(num);
+                               dprintf("trigram '%c%c%c' (%zu bytes) decoded to %zu entries\n", trgm & 0xff,
+                                       (trgm >> 8) & 0xff, (trgm >> 16) & 0xff, len, num);
+                       } else {
+                               if (in2.size() < num + 128) {
+                                       in2.resize(num + 128);
+                               }
+                               p4nd1dec128v32(pldata, num, &in2[0]);
+
+                               out.clear();
+                               set_intersection(in1.begin(), in1.end(), in2.begin(), in2.begin() + num,
+                                                back_inserter(out));
+                               swap(in1, out);
+                               dprintf("trigram '%c%c%c' (%zu bytes) decoded to %zu entries, %zu left\n",
+                                       trgm & 0xff, (trgm >> 8) & 0xff, (trgm >> 16) & 0xff,
+                                       len, num, in1.size());
+                               if (in1.empty()) {
+                                       dprintf("no matches (intersection list is empty)\n");
+                                       done = true;
+                               }
+                       }
+               });
        }
-       steady_clock::time_point end = steady_clock::now();
-
+       engine.finish();
        dprintf("Intersection took %.1f ms. Doing final verification and printing:\n",
-               1e3 * duration<float>(end - start).count());
-
-       unordered_map<string, bool> access_rx_cache;
+               1e3 * duration<float>(steady_clock::now() - start).count());
 
-       int matched = 0;
-       for (uint32_t docid : in1) {
-               matched += scan_docid(needle, docid, corpus, &access_rx_cache);
-       }
-       end = steady_clock::now();
-       dprintf("Done in %.1f ms, found %d matches.\n",
-               1e3 * duration<float>(end - start).count(), matched);
+       size_t matched __attribute__((unused)) = scan_docids(needle, in1, corpus, &engine);
+       dprintf("Done in %.1f ms, found %zu matches.\n",
+               1e3 * duration<float>(steady_clock::now() - start).count(), matched);
 }
 
 int main(int argc, char **argv)