]> git.sesse.net Git - plocate/commitdiff
Do the access checking asynchronously if possible.
authorSteinar H. Gunderson <steinar+git@gunderson.no>
Sun, 11 Oct 2020 17:59:11 +0000 (19:59 +0200)
committerSteinar H. Gunderson <steinar+git@gunderson.no>
Sun, 11 Oct 2020 18:09:52 +0000 (20:09 +0200)
There are many issues involved:

 - There's no access() support in io_uring (yet?), so we fake it
   by doing statx() on the directory first, which primes the
   dentry cache so that synchronous access() becomes very fast.
   It is a bit tricky, since multiple access checks could be
   going on at the same time, which the need to all wait
   for the same statx() call.
 - Not even all kernels support statx() in io_uring (support starts
   from 5.6+).
 - Serialization now becomes two-level, and more involved.
   We don't have an obvious single counter anymore, so we need
   to be able to start a docid without knowing how many candidates
   there are (and thus, be able to tell Serializer that we are
   at the end).
 - Limit becomes more tricky, since there can be more calls on
   the way back. We solve this by moving limit into Serializer,
   and hard-exiting when we hit the limit.
 - We need to prioritize statx() calls ahead of read(), so that
   we don't end up with very delayed output when the new read()
   calls generate even more statx() calls and we get a huge
   backlog of calls. (We can't prioritize in the kernel, but we
   can on the overflow queue we're managing ourselves.) This is
   especially important with --limit.

io_uring_engine.cpp
io_uring_engine.h
plocate.cpp

index 03160b12a4abb1f04e83a84acfcf84d7589ad1f1..70497a44cb122acf501d52c7062db124e6551e9d 100644 (file)
@@ -1,4 +1,6 @@
+#include <assert.h>
 #include <errno.h>
+#include <fcntl.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -11,6 +13,7 @@
 #include <functional>
 #include <iosfwd>
 #include <string>
+#include <sys/stat.h>
 #include <unistd.h>
 #include <utility>
 
@@ -29,6 +32,38 @@ IOUringEngine::IOUringEngine(size_t slop_bytes)
        }
 #endif
        using_uring = (ret >= 0);
+
+#ifndef WITHOUT_URING
+       if (using_uring) {
+               io_uring_probe *probe = io_uring_get_probe_ring(&ring);
+               supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
+               if (!supports_stat) {
+                       dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
+               }
+               free(probe);
+       }
+#endif
+}
+
+void IOUringEngine::submit_stat(const char *path, std::function<void()> cb)
+{
+       assert(supports_stat);
+
+#ifndef WITHOUT_URING
+       if (pending_reads < queue_depth) {
+               io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+               if (sqe == nullptr) {
+                       fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+                       exit(1);
+               }
+               submit_stat_internal(sqe, strdup(path), move(cb));
+       } else {
+               QueuedStat qs;
+               qs.cb = move(cb);
+               qs.pathname = strdup(path);
+               queued_stats.push(move(qs));
+       }
+#endif
 }
 
 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
@@ -64,9 +99,30 @@ void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len,
                fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
                exit(1);
        }
-       PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
 
-       io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
+       PendingRead *pending = new PendingRead;
+       pending->op = OP_READ;
+       pending->read_cb = move(cb);
+       pending->read.buf = buf;
+       pending->read.len = len;
+       pending->read.fd = fd;
+       pending->read.offset = offset;
+       pending->read.iov = iovec{ buf, len };
+
+       io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
+       io_uring_sqe_set_data(sqe, pending);
+       ++pending_reads;
+}
+
+void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void()> cb)
+{
+       PendingRead *pending = new PendingRead;
+       pending->op = OP_STAT;
+       pending->stat_cb = move(cb);
+       pending->stat.pathname = path;
+       pending->stat.buf = new struct statx;
+
+       io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT, STATX_MODE, pending->stat.buf);
        io_uring_sqe_set_data(sqe, pending);
        ++pending_reads;
 }
@@ -104,33 +160,14 @@ void IOUringEngine::finish()
                io_uring_for_each_cqe(&ring, head, cqe)
                {
                        PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
-                       if (cqe->res <= 0) {
-                               fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
-                               exit(1);
-                       }
-
-                       if (size_t(cqe->res) < pending->iov.iov_len) {
-                               // Incomplete read, so resubmit it.
-                               pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
-                               pending->iov.iov_len -= cqe->res;
-                               pending->offset += cqe->res;
-                               io_uring_cqe_seen(&ring, cqe);
-
-                               io_uring_sqe *sqe = io_uring_get_sqe(&ring);
-                               if (sqe == nullptr) {
-                                       fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
-                                       exit(1);
-                               }
-                               io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
-                               io_uring_sqe_set_data(sqe, pending);
-                               anything_to_submit = true;
-                       } else {
+                       if (pending->op == OP_STAT) {
                                io_uring_cqe_seen(&ring, cqe);
                                --pending_reads;
 
                                size_t old_pending_reads = pending_reads;
-                               pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
-                               free(pending->buf);
+                               pending->stat_cb();
+                               free(pending->stat.pathname);
+                               delete pending->stat.buf;
                                delete pending;
 
                                if (pending_reads != old_pending_reads) {
@@ -138,7 +175,61 @@ void IOUringEngine::finish()
                                        // so we need to re-submit.
                                        anything_to_submit = true;
                                }
+                       } else {
+                               if (cqe->res <= 0) {
+                                       fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
+                                       exit(1);
+                               }
+
+                               if (size_t(cqe->res) < pending->read.iov.iov_len) {
+                                       // Incomplete read, so resubmit it.
+                                       pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
+                                       pending->read.iov.iov_len -= cqe->res;
+                                       pending->read.offset += cqe->res;
+                                       io_uring_cqe_seen(&ring, cqe);
+
+                                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                                       if (sqe == nullptr) {
+                                               fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+                                               exit(1);
+                                       }
+                                       io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
+                                       io_uring_sqe_set_data(sqe, pending);
+                                       anything_to_submit = true;
+                               } else {
+                                       io_uring_cqe_seen(&ring, cqe);
+                                       --pending_reads;
+
+                                       size_t old_pending_reads = pending_reads;
+                                       pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
+                                       free(pending->read.buf);
+                                       delete pending;
+
+                                       if (pending_reads != old_pending_reads) {
+                                               // A new read was made in the callback (and not queued),
+                                               // so we need to re-submit.
+                                               anything_to_submit = true;
+                                       }
+                               }
+                       }
+               }
+
+               // See if there are any queued stats we can submit now.
+               // Running a stat means we're very close to printing out a match,
+               // which is more important than reading more blocks from disk.
+               // (Even if those blocks returned early, they would only generate
+               // more matches that would be blocked by this one in Serializer.)
+               // Thus, prioritize stats.
+               while (!queued_stats.empty() && pending_reads < queue_depth) {
+                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                       if (sqe == nullptr) {
+                               fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+                               exit(1);
                        }
+                       QueuedStat &qs = queued_stats.front();
+                       submit_stat_internal(sqe, qs.pathname, move(qs.cb));
+                       queued_stats.pop();
+                       anything_to_submit = true;
                }
 
                // See if there are any queued reads we can submit now.
index c165a40f51df19f2054ff6210d526e52c2e3e3fb..258d460d1219f03807fa114cf0485924b6c0ba94 100644 (file)
@@ -17,17 +17,26 @@ class IOUringEngine {
 public:
        IOUringEngine(size_t slop_bytes);
        void submit_read(int fd, size_t len, off_t offset, std::function<void(std::string_view)> cb);
+
+       // NOTE: We just do the stat() to get the data into the dentry cache for fast access;
+       // we don't care about the return value. Thus, the callback has no parameter lists.
+       // If we have no io_uring, the callback will be made immediately, with no stat() call
+       // being done.
+       void submit_stat(const char *path, std::function<void()> cb);
+       bool get_supports_stat() { return supports_stat; }
+
        void finish();
        size_t get_waiting_reads() const { return pending_reads + queued_reads.size(); }
 
 private:
 #ifndef WITHOUT_URING
        void submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, std::function<void(std::string_view)> cb);
+       void submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void()> cb);
 
        io_uring ring;
 #endif
        size_t pending_reads = 0;  // Number of requests we have going in the ring.
-       bool using_uring;
+       bool using_uring, supports_stat = false;
        const size_t slop_bytes;
 
        struct QueuedRead {
@@ -38,15 +47,36 @@ private:
        };
        std::queue<QueuedRead> queued_reads;
 
+       struct QueuedStat {
+               char *pathname;  // Owned by us.
+               std::function<void()> cb;
+       };
+       std::queue<QueuedStat> queued_stats;
+
+       enum Op { OP_READ,
+                 OP_STAT };
+
        struct PendingRead {
-               void *buf;
-               size_t len;
-               std::function<void(std::string_view)> cb;
+               Op op;
 
-               // For re-submission.
-               int fd;
-               off_t offset;
-               iovec iov;
+               std::function<void(std::string_view)> read_cb;
+               std::function<void()> stat_cb;
+
+               union {
+                       struct {
+                               void *buf;
+                               size_t len;
+
+                               // For re-submission.
+                               int fd;
+                               off_t offset;
+                               iovec iov;
+                       } read;
+                       struct {
+                               char *pathname;
+                               struct statx *buf;
+                       } stat;
+               };
        };
 
        // 256 simultaneous requests should be ample, for slow and fast media alike.
index a76c288a2bca86f0a11ebfec822d34721fa38335..9f4eee3d74899a733fecc5ea3f1b8c559743494c 100644 (file)
@@ -16,6 +16,7 @@
 #include <iosfwd>
 #include <iterator>
 #include <limits>
+#include <map>
 #include <memory>
 #include <queue>
 #include <regex.h>
@@ -45,18 +46,33 @@ bool use_debug = false;
 bool patterns_are_regex = false;
 bool use_extended_regex = false;
 int64_t limit_matches = numeric_limits<int64_t>::max();
+int64_t limit_left = numeric_limits<int64_t>::max();
+
+steady_clock::time_point start;
+
+void apply_limit()
+{
+       if (--limit_left > 0) {
+               return;
+       }
+       dprintf("Done in %.1f ms, found %" PRId64 " matches.\n",
+               1e3 * duration<float>(steady_clock::now() - start).count(), limit_matches);
+       if (only_count) {
+               printf("%ld\n", limit_matches);
+       }
+       exit(0);
+}
 
 class Serializer {
 public:
-       bool ready_to_print(int seq) { return next_seq == seq; }
-       void print_delayed(int seq, const vector<string> msg);
-       void release_current();
+       ~Serializer() { assert(limit_left <= 0 || pending.empty()); }
+       void print(uint64_t seq, uint64_t skip, const string msg);
 
 private:
-       int next_seq = 0;
+       uint64_t next_seq = 0;
        struct Element {
-               int seq;
-               vector<string> msg;
+               uint64_t seq, skip;
+               string msg;
 
                bool operator<(const Element &other) const
                {
@@ -66,28 +82,42 @@ private:
        priority_queue<Element> pending;
 };
 
-void Serializer::print_delayed(int seq, const vector<string> msg)
+void Serializer::print(uint64_t seq, uint64_t skip, const string msg)
 {
-       pending.push(Element{ seq, move(msg) });
-}
+       if (only_count) {
+               if (!msg.empty()) {
+                       apply_limit();
+               }
+               return;
+       }
 
-void Serializer::release_current()
-{
-       ++next_seq;
+       if (next_seq != seq) {
+               pending.push(Element{ seq, skip, move(msg) });
+               return;
+       }
+
+       if (!msg.empty()) {
+               if (print_nul) {
+                       printf("%s%c", msg.c_str(), 0);
+               } else {
+                       printf("%s\n", msg.c_str());
+               }
+               apply_limit();
+       }
+       next_seq += skip;
 
        // See if any delayed prints can now be dealt with.
        while (!pending.empty() && pending.top().seq == next_seq) {
-               if (limit_matches-- <= 0)
-                       return;
-               for (const string &msg : pending.top().msg) {
+               if (!pending.top().msg.empty()) {
                        if (print_nul) {
-                               printf("%s%c", msg.c_str(), 0);
+                               printf("%s%c", pending.top().msg.c_str(), 0);
                        } else {
-                               printf("%s\n", msg.c_str());
+                               printf("%s\n", pending.top().msg.c_str());
                        }
+                       apply_limit();
                }
+               next_seq += pending.top().skip;
                pending.pop();
-               ++next_seq;
        }
 }
 
@@ -112,27 +142,83 @@ bool matches(const Needle &needle, const char *haystack)
        }
 }
 
-bool has_access(const char *filename,
-                unordered_map<string, bool> *access_rx_cache)
+class AccessRXCache {
+public:
+       AccessRXCache(IOUringEngine *engine)
+               : engine(engine) {}
+       void check_access(const char *filename, bool allow_async, function<void(bool)> cb);
+
+private:
+       unordered_map<string, bool> cache;
+       struct PendingStat {
+               string filename;
+               function<void(bool)> cb;
+       };
+       map<string, vector<PendingStat>> pending_stats;
+       IOUringEngine *engine;
+};
+
+void AccessRXCache::check_access(const char *filename, bool allow_async, function<void(bool)> cb)
 {
-       const char *end = strchr(filename + 1, '/');
-       while (end != nullptr) {
-               string parent_path(filename, end);
-               auto it = access_rx_cache->find(parent_path);
-               bool ok;
-               if (it == access_rx_cache->end()) {
-                       ok = access(parent_path.c_str(), R_OK | X_OK) == 0;
-                       access_rx_cache->emplace(move(parent_path), ok);
-               } else {
-                       ok = it->second;
+       if (!engine->get_supports_stat()) {
+               allow_async = false;
+       }
+
+       for (const char *end = strchr(filename + 1, '/'); end != nullptr; end = strchr(end + 1, '/')) {
+               string parent_path(filename, end - filename);  // string_view from C++20.
+               auto cache_it = cache.find(parent_path);
+               if (cache_it != cache.end()) {
+                       // Found in the cache.
+                       if (!cache_it->second) {
+                               cb(false);
+                               return;
+                       }
+                       continue;
+               }
+
+               if (!allow_async) {
+                       bool ok = access(parent_path.c_str(), R_OK | X_OK) == 0;
+                       cache.emplace(parent_path, ok);
+                       if (!ok) {
+                               cb(false);
+                               return;
+                       }
+                       continue;
                }
-               if (!ok) {
-                       return false;
+
+               // We want to call access(), but it could block on I/O. io_uring doesn't support
+               // access(), but we can do a dummy asynchonous statx() to populate the kernel's cache,
+               // which nearly always makes the next access() instantaneous.
+
+               // See if there's already a pending stat that matches this,
+               // or is a subdirectory.
+               auto it = pending_stats.lower_bound(parent_path);
+               if (it != pending_stats.end() && it->first.size() >= parent_path.size() &&
+                   it->first.compare(0, parent_path.size(), parent_path) == 0) {
+                       it->second.emplace_back(PendingStat{ filename, move(cb) });
+               } else {
+                       it = pending_stats.emplace(filename, vector<PendingStat>{}).first;
+                       engine->submit_stat(filename, [this, it, filename{ strdup(filename) }, cb{ move(cb) }] {
+                               // The stat returned, so now do the actual access() calls.
+                               // All of them should be in cache, so don't fire off new statx()
+                               // calls during that check.
+                               check_access(filename, /*allow_async=*/false, move(cb));
+                               free(filename);
+
+                               // Call all others that waited for the same stat() to finish.
+                               // They may fire off new stat() calls if needed.
+                               vector<PendingStat> pending = move(it->second);
+                               pending_stats.erase(it);
+                               for (PendingStat &ps : pending) {
+                                       check_access(ps.filename.c_str(), /*allow_async=*/true, move(ps.cb));
+                               }
+                       });
                }
-               end = strchr(end + 1, '/');
+               return;  // The rest will happen in async context.
        }
 
-       return true;
+       // Passed all checks.
+       cb(true);
 }
 
 class Corpus {
@@ -217,12 +303,10 @@ size_t Corpus::get_num_filename_blocks() const
        return hdr.num_docids;
 }
 
-uint64_t scan_file_block(const vector<Needle> &needles, string_view compressed,
-                         unordered_map<string, bool> *access_rx_cache, int seq,
-                         Serializer *serializer)
+void scan_file_block(const vector<Needle> &needles, string_view compressed,
+                     AccessRXCache *access_rx_cache, uint64_t seq, Serializer *serializer, IOUringEngine *engine,
+                     size_t *matched)
 {
-       uint64_t matched = 0;
-
        unsigned long long uncompressed_len = ZSTD_getFrameContentSize(compressed.data(), compressed.size());
        if (uncompressed_len == ZSTD_CONTENTSIZE_UNKNOWN || uncompressed_len == ZSTD_CONTENTSIZE_ERROR) {
                fprintf(stderr, "ZSTD_getFrameContentSize() failed\n");
@@ -240,9 +324,23 @@ uint64_t scan_file_block(const vector<Needle> &needles, string_view compressed,
        }
        block[block.size() - 1] = '\0';
 
-       bool immediate_print = (serializer == nullptr || serializer->ready_to_print(seq));
-       vector<string> delayed;
+       auto test_candidate = [&](const char *filename, uint64_t local_seq, uint64_t next_seq) {
+               access_rx_cache->check_access(filename, /*allow_async=*/true, [matched, serializer, local_seq, next_seq, filename{ strdup(filename) }](bool ok) {
+                       if (ok) {
+                               ++*matched;
+                               serializer->print(local_seq, next_seq - local_seq, filename);
+                       } else {
+                               serializer->print(local_seq, next_seq - local_seq, "");
+                       }
+                       free(filename);
+               });
+       };
+
+       // We need to know the next sequence number before inserting into Serializer,
+       // so always buffer one candidate.
+       const char *pending_candidate = nullptr;
 
+       uint64_t local_seq = seq << 32;
        for (const char *filename = block.data();
             filename != block.data() + block.size();
             filename += strlen(filename) + 1) {
@@ -253,42 +351,30 @@ uint64_t scan_file_block(const vector<Needle> &needles, string_view compressed,
                                break;
                        }
                }
-               if (found && has_access(filename, access_rx_cache)) {
-                       if (limit_matches-- <= 0)
-                               break;
-                       ++matched;
-                       if (only_count)
-                               continue;
-                       if (immediate_print) {
-                               if (print_nul) {
-                                       printf("%s%c", filename, 0);
-                               } else {
-                                       printf("%s\n", filename);
-                               }
-                       } else {
-                               delayed.push_back(filename);
+               if (found) {
+                       if (pending_candidate != nullptr) {
+                               test_candidate(pending_candidate, local_seq, local_seq + 1);
+                               ++local_seq;
                        }
+                       pending_candidate = filename;
                }
        }
-       if (serializer != nullptr && !only_count) {
-               if (immediate_print) {
-                       serializer->release_current();
-               } else {
-                       serializer->print_delayed(seq, move(delayed));
-               }
+       if (pending_candidate == nullptr) {
+               serializer->print(seq << 32, 1ULL << 32, "");
+       } else {
+               test_candidate(pending_candidate, local_seq, (seq + 1) << 32);
        }
-       return matched;
 }
 
 size_t scan_docids(const vector<Needle> &needles, const vector<uint32_t> &docids, const Corpus &corpus, IOUringEngine *engine)
 {
        Serializer docids_in_order;
-       unordered_map<string, bool> access_rx_cache;
+       AccessRXCache access_rx_cache(engine);
        uint64_t matched = 0;
        for (size_t i = 0; i < docids.size(); ++i) {
                uint32_t docid = docids[i];
-               corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) {
-                       matched += scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order);
+               corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, engine, &docids_in_order](string_view compressed) {
+                       scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order, engine, &matched);
                });
        }
        engine->finish();
@@ -300,7 +386,7 @@ size_t scan_docids(const vector<Needle> &needles, const vector<uint32_t> &docids
 // coalesce it plus readahead for us.
 uint64_t scan_all_docids(const vector<Needle> &needles, int fd, const Corpus &corpus, IOUringEngine *engine)
 {
-       unordered_map<string, bool> access_rx_cache;
+       AccessRXCache access_rx_cache(engine);
        uint32_t num_blocks = corpus.get_num_filename_blocks();
        unique_ptr<uint64_t[]> offsets(new uint64_t[num_blocks + 1]);
        complete_pread(fd, offsets.get(), (num_blocks + 1) * sizeof(uint64_t), corpus.offset_for_block(0));
@@ -317,9 +403,7 @@ uint64_t scan_all_docids(const vector<Needle> &needles, int fd, const Corpus &co
                for (uint32_t docid = io_docid; docid < last_docid; ++docid) {
                        size_t relative_offset = offsets[docid] - offsets[io_docid];
                        size_t len = offsets[docid + 1] - offsets[docid];
-                       matched += scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache, 0, nullptr);
-                       if (limit_matches <= 0)
-                               return matched;
+                       scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache, 0, nullptr, engine, &matched);
                }
        }
        return matched;
@@ -386,7 +470,7 @@ void do_search_file(const vector<Needle> &needles, const char *filename)
                exit(EXIT_FAILURE);
        }
 
-       steady_clock::time_point start __attribute__((unused)) = steady_clock::now();
+       start = steady_clock::now();
        if (access("/", R_OK | X_OK)) {
                // We can't find anything, no need to bother...
                return;
@@ -662,7 +746,7 @@ int main(int argc, char **argv)
                        break;
                case 'l':
                case 'n':
-                       limit_matches = atoll(optarg);
+                       limit_matches = limit_left = atoll(optarg);
                        if (limit_matches <= 0) {
                                fprintf(stderr, "Error: limit must be a strictly positive number.\n");
                                exit(1);