From 3c481c94adcbd3d0aa3a9767129664d7253cf189 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sun, 11 Oct 2020 19:59:11 +0200 Subject: [PATCH] Do the access checking asynchronously if possible. There are many issues involved: - There's no access() support in io_uring (yet?), so we fake it by doing statx() on the directory first, which primes the dentry cache so that synchronous access() becomes very fast. It is a bit tricky, since multiple access checks could be going on at the same time, which the need to all wait for the same statx() call. - Not even all kernels support statx() in io_uring (support starts from 5.6+). - Serialization now becomes two-level, and more involved. We don't have an obvious single counter anymore, so we need to be able to start a docid without knowing how many candidates there are (and thus, be able to tell Serializer that we are at the end). - Limit becomes more tricky, since there can be more calls on the way back. We solve this by moving limit into Serializer, and hard-exiting when we hit the limit. - We need to prioritize statx() calls ahead of read(), so that we don't end up with very delayed output when the new read() calls generate even more statx() calls and we get a huge backlog of calls. (We can't prioritize in the kernel, but we can on the overflow queue we're managing ourselves.) This is especially important with --limit. --- io_uring_engine.cpp | 141 ++++++++++++++++++++++----- io_uring_engine.h | 46 +++++++-- plocate.cpp | 226 ++++++++++++++++++++++++++++++-------------- 3 files changed, 309 insertions(+), 104 deletions(-) diff --git a/io_uring_engine.cpp b/io_uring_engine.cpp index 03160b1..70497a4 100644 --- a/io_uring_engine.cpp +++ b/io_uring_engine.cpp @@ -1,4 +1,6 @@ +#include #include +#include #include #include #include @@ -11,6 +13,7 @@ #include #include #include +#include #include #include @@ -29,6 +32,38 @@ IOUringEngine::IOUringEngine(size_t slop_bytes) } #endif using_uring = (ret >= 0); + +#ifndef WITHOUT_URING + if (using_uring) { + io_uring_probe *probe = io_uring_get_probe_ring(&ring); + supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX)); + if (!supports_stat) { + dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n"); + } + free(probe); + } +#endif +} + +void IOUringEngine::submit_stat(const char *path, std::function cb) +{ + assert(supports_stat); + +#ifndef WITHOUT_URING + if (pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); + } + submit_stat_internal(sqe, strdup(path), move(cb)); + } else { + QueuedStat qs; + qs.cb = move(cb); + qs.pathname = strdup(path); + queued_stats.push(move(qs)); + } +#endif } void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) @@ -64,9 +99,30 @@ void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno)); exit(1); } - PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } }; - io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset); + PendingRead *pending = new PendingRead; + pending->op = OP_READ; + pending->read_cb = move(cb); + pending->read.buf = buf; + pending->read.len = len; + pending->read.fd = fd; + pending->read.offset = offset; + pending->read.iov = iovec{ buf, len }; + + io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset); + io_uring_sqe_set_data(sqe, pending); + ++pending_reads; +} + +void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function cb) +{ + PendingRead *pending = new PendingRead; + pending->op = OP_STAT; + pending->stat_cb = move(cb); + pending->stat.pathname = path; + pending->stat.buf = new struct statx; + + io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT, STATX_MODE, pending->stat.buf); io_uring_sqe_set_data(sqe, pending); ++pending_reads; } @@ -104,33 +160,14 @@ void IOUringEngine::finish() io_uring_for_each_cqe(&ring, head, cqe) { PendingRead *pending = reinterpret_cast(cqe->user_data); - if (cqe->res <= 0) { - fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); - exit(1); - } - - if (size_t(cqe->res) < pending->iov.iov_len) { - // Incomplete read, so resubmit it. - pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res; - pending->iov.iov_len -= cqe->res; - pending->offset += cqe->res; - io_uring_cqe_seen(&ring, cqe); - - io_uring_sqe *sqe = io_uring_get_sqe(&ring); - if (sqe == nullptr) { - fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); - exit(1); - } - io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset); - io_uring_sqe_set_data(sqe, pending); - anything_to_submit = true; - } else { + if (pending->op == OP_STAT) { io_uring_cqe_seen(&ring, cqe); --pending_reads; size_t old_pending_reads = pending_reads; - pending->cb(string_view(reinterpret_cast(pending->buf), pending->len)); - free(pending->buf); + pending->stat_cb(); + free(pending->stat.pathname); + delete pending->stat.buf; delete pending; if (pending_reads != old_pending_reads) { @@ -138,7 +175,61 @@ void IOUringEngine::finish() // so we need to re-submit. anything_to_submit = true; } + } else { + if (cqe->res <= 0) { + fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); + exit(1); + } + + if (size_t(cqe->res) < pending->read.iov.iov_len) { + // Incomplete read, so resubmit it. + pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res; + pending->read.iov.iov_len -= cqe->res; + pending->read.offset += cqe->res; + io_uring_cqe_seen(&ring, cqe); + + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); + exit(1); + } + io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset); + io_uring_sqe_set_data(sqe, pending); + anything_to_submit = true; + } else { + io_uring_cqe_seen(&ring, cqe); + --pending_reads; + + size_t old_pending_reads = pending_reads; + pending->read_cb(string_view(reinterpret_cast(pending->read.buf), pending->read.len)); + free(pending->read.buf); + delete pending; + + if (pending_reads != old_pending_reads) { + // A new read was made in the callback (and not queued), + // so we need to re-submit. + anything_to_submit = true; + } + } + } + } + + // See if there are any queued stats we can submit now. + // Running a stat means we're very close to printing out a match, + // which is more important than reading more blocks from disk. + // (Even if those blocks returned early, they would only generate + // more matches that would be blocked by this one in Serializer.) + // Thus, prioritize stats. + while (!queued_stats.empty() && pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); } + QueuedStat &qs = queued_stats.front(); + submit_stat_internal(sqe, qs.pathname, move(qs.cb)); + queued_stats.pop(); + anything_to_submit = true; } // See if there are any queued reads we can submit now. diff --git a/io_uring_engine.h b/io_uring_engine.h index c165a40..258d460 100644 --- a/io_uring_engine.h +++ b/io_uring_engine.h @@ -17,17 +17,26 @@ class IOUringEngine { public: IOUringEngine(size_t slop_bytes); void submit_read(int fd, size_t len, off_t offset, std::function cb); + + // NOTE: We just do the stat() to get the data into the dentry cache for fast access; + // we don't care about the return value. Thus, the callback has no parameter lists. + // If we have no io_uring, the callback will be made immediately, with no stat() call + // being done. + void submit_stat(const char *path, std::function cb); + bool get_supports_stat() { return supports_stat; } + void finish(); size_t get_waiting_reads() const { return pending_reads + queued_reads.size(); } private: #ifndef WITHOUT_URING void submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, std::function cb); + void submit_stat_internal(io_uring_sqe *sqe, char *path, std::function cb); io_uring ring; #endif size_t pending_reads = 0; // Number of requests we have going in the ring. - bool using_uring; + bool using_uring, supports_stat = false; const size_t slop_bytes; struct QueuedRead { @@ -38,15 +47,36 @@ private: }; std::queue queued_reads; + struct QueuedStat { + char *pathname; // Owned by us. + std::function cb; + }; + std::queue queued_stats; + + enum Op { OP_READ, + OP_STAT }; + struct PendingRead { - void *buf; - size_t len; - std::function cb; + Op op; - // For re-submission. - int fd; - off_t offset; - iovec iov; + std::function read_cb; + std::function stat_cb; + + union { + struct { + void *buf; + size_t len; + + // For re-submission. + int fd; + off_t offset; + iovec iov; + } read; + struct { + char *pathname; + struct statx *buf; + } stat; + }; }; // 256 simultaneous requests should be ample, for slow and fast media alike. diff --git a/plocate.cpp b/plocate.cpp index a76c288..9f4eee3 100644 --- a/plocate.cpp +++ b/plocate.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -45,18 +46,33 @@ bool use_debug = false; bool patterns_are_regex = false; bool use_extended_regex = false; int64_t limit_matches = numeric_limits::max(); +int64_t limit_left = numeric_limits::max(); + +steady_clock::time_point start; + +void apply_limit() +{ + if (--limit_left > 0) { + return; + } + dprintf("Done in %.1f ms, found %" PRId64 " matches.\n", + 1e3 * duration(steady_clock::now() - start).count(), limit_matches); + if (only_count) { + printf("%ld\n", limit_matches); + } + exit(0); +} class Serializer { public: - bool ready_to_print(int seq) { return next_seq == seq; } - void print_delayed(int seq, const vector msg); - void release_current(); + ~Serializer() { assert(limit_left <= 0 || pending.empty()); } + void print(uint64_t seq, uint64_t skip, const string msg); private: - int next_seq = 0; + uint64_t next_seq = 0; struct Element { - int seq; - vector msg; + uint64_t seq, skip; + string msg; bool operator<(const Element &other) const { @@ -66,28 +82,42 @@ private: priority_queue pending; }; -void Serializer::print_delayed(int seq, const vector msg) +void Serializer::print(uint64_t seq, uint64_t skip, const string msg) { - pending.push(Element{ seq, move(msg) }); -} + if (only_count) { + if (!msg.empty()) { + apply_limit(); + } + return; + } -void Serializer::release_current() -{ - ++next_seq; + if (next_seq != seq) { + pending.push(Element{ seq, skip, move(msg) }); + return; + } + + if (!msg.empty()) { + if (print_nul) { + printf("%s%c", msg.c_str(), 0); + } else { + printf("%s\n", msg.c_str()); + } + apply_limit(); + } + next_seq += skip; // See if any delayed prints can now be dealt with. while (!pending.empty() && pending.top().seq == next_seq) { - if (limit_matches-- <= 0) - return; - for (const string &msg : pending.top().msg) { + if (!pending.top().msg.empty()) { if (print_nul) { - printf("%s%c", msg.c_str(), 0); + printf("%s%c", pending.top().msg.c_str(), 0); } else { - printf("%s\n", msg.c_str()); + printf("%s\n", pending.top().msg.c_str()); } + apply_limit(); } + next_seq += pending.top().skip; pending.pop(); - ++next_seq; } } @@ -112,27 +142,83 @@ bool matches(const Needle &needle, const char *haystack) } } -bool has_access(const char *filename, - unordered_map *access_rx_cache) +class AccessRXCache { +public: + AccessRXCache(IOUringEngine *engine) + : engine(engine) {} + void check_access(const char *filename, bool allow_async, function cb); + +private: + unordered_map cache; + struct PendingStat { + string filename; + function cb; + }; + map> pending_stats; + IOUringEngine *engine; +}; + +void AccessRXCache::check_access(const char *filename, bool allow_async, function cb) { - const char *end = strchr(filename + 1, '/'); - while (end != nullptr) { - string parent_path(filename, end); - auto it = access_rx_cache->find(parent_path); - bool ok; - if (it == access_rx_cache->end()) { - ok = access(parent_path.c_str(), R_OK | X_OK) == 0; - access_rx_cache->emplace(move(parent_path), ok); - } else { - ok = it->second; + if (!engine->get_supports_stat()) { + allow_async = false; + } + + for (const char *end = strchr(filename + 1, '/'); end != nullptr; end = strchr(end + 1, '/')) { + string parent_path(filename, end - filename); // string_view from C++20. + auto cache_it = cache.find(parent_path); + if (cache_it != cache.end()) { + // Found in the cache. + if (!cache_it->second) { + cb(false); + return; + } + continue; + } + + if (!allow_async) { + bool ok = access(parent_path.c_str(), R_OK | X_OK) == 0; + cache.emplace(parent_path, ok); + if (!ok) { + cb(false); + return; + } + continue; } - if (!ok) { - return false; + + // We want to call access(), but it could block on I/O. io_uring doesn't support + // access(), but we can do a dummy asynchonous statx() to populate the kernel's cache, + // which nearly always makes the next access() instantaneous. + + // See if there's already a pending stat that matches this, + // or is a subdirectory. + auto it = pending_stats.lower_bound(parent_path); + if (it != pending_stats.end() && it->first.size() >= parent_path.size() && + it->first.compare(0, parent_path.size(), parent_path) == 0) { + it->second.emplace_back(PendingStat{ filename, move(cb) }); + } else { + it = pending_stats.emplace(filename, vector{}).first; + engine->submit_stat(filename, [this, it, filename{ strdup(filename) }, cb{ move(cb) }] { + // The stat returned, so now do the actual access() calls. + // All of them should be in cache, so don't fire off new statx() + // calls during that check. + check_access(filename, /*allow_async=*/false, move(cb)); + free(filename); + + // Call all others that waited for the same stat() to finish. + // They may fire off new stat() calls if needed. + vector pending = move(it->second); + pending_stats.erase(it); + for (PendingStat &ps : pending) { + check_access(ps.filename.c_str(), /*allow_async=*/true, move(ps.cb)); + } + }); } - end = strchr(end + 1, '/'); + return; // The rest will happen in async context. } - return true; + // Passed all checks. + cb(true); } class Corpus { @@ -217,12 +303,10 @@ size_t Corpus::get_num_filename_blocks() const return hdr.num_docids; } -uint64_t scan_file_block(const vector &needles, string_view compressed, - unordered_map *access_rx_cache, int seq, - Serializer *serializer) +void scan_file_block(const vector &needles, string_view compressed, + AccessRXCache *access_rx_cache, uint64_t seq, Serializer *serializer, IOUringEngine *engine, + size_t *matched) { - uint64_t matched = 0; - unsigned long long uncompressed_len = ZSTD_getFrameContentSize(compressed.data(), compressed.size()); if (uncompressed_len == ZSTD_CONTENTSIZE_UNKNOWN || uncompressed_len == ZSTD_CONTENTSIZE_ERROR) { fprintf(stderr, "ZSTD_getFrameContentSize() failed\n"); @@ -240,9 +324,23 @@ uint64_t scan_file_block(const vector &needles, string_view compressed, } block[block.size() - 1] = '\0'; - bool immediate_print = (serializer == nullptr || serializer->ready_to_print(seq)); - vector delayed; + auto test_candidate = [&](const char *filename, uint64_t local_seq, uint64_t next_seq) { + access_rx_cache->check_access(filename, /*allow_async=*/true, [matched, serializer, local_seq, next_seq, filename{ strdup(filename) }](bool ok) { + if (ok) { + ++*matched; + serializer->print(local_seq, next_seq - local_seq, filename); + } else { + serializer->print(local_seq, next_seq - local_seq, ""); + } + free(filename); + }); + }; + + // We need to know the next sequence number before inserting into Serializer, + // so always buffer one candidate. + const char *pending_candidate = nullptr; + uint64_t local_seq = seq << 32; for (const char *filename = block.data(); filename != block.data() + block.size(); filename += strlen(filename) + 1) { @@ -253,42 +351,30 @@ uint64_t scan_file_block(const vector &needles, string_view compressed, break; } } - if (found && has_access(filename, access_rx_cache)) { - if (limit_matches-- <= 0) - break; - ++matched; - if (only_count) - continue; - if (immediate_print) { - if (print_nul) { - printf("%s%c", filename, 0); - } else { - printf("%s\n", filename); - } - } else { - delayed.push_back(filename); + if (found) { + if (pending_candidate != nullptr) { + test_candidate(pending_candidate, local_seq, local_seq + 1); + ++local_seq; } + pending_candidate = filename; } } - if (serializer != nullptr && !only_count) { - if (immediate_print) { - serializer->release_current(); - } else { - serializer->print_delayed(seq, move(delayed)); - } + if (pending_candidate == nullptr) { + serializer->print(seq << 32, 1ULL << 32, ""); + } else { + test_candidate(pending_candidate, local_seq, (seq + 1) << 32); } - return matched; } size_t scan_docids(const vector &needles, const vector &docids, const Corpus &corpus, IOUringEngine *engine) { Serializer docids_in_order; - unordered_map access_rx_cache; + AccessRXCache access_rx_cache(engine); uint64_t matched = 0; for (size_t i = 0; i < docids.size(); ++i) { uint32_t docid = docids[i]; - corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) { - matched += scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order); + corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, engine, &docids_in_order](string_view compressed) { + scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order, engine, &matched); }); } engine->finish(); @@ -300,7 +386,7 @@ size_t scan_docids(const vector &needles, const vector &docids // coalesce it plus readahead for us. uint64_t scan_all_docids(const vector &needles, int fd, const Corpus &corpus, IOUringEngine *engine) { - unordered_map access_rx_cache; + AccessRXCache access_rx_cache(engine); uint32_t num_blocks = corpus.get_num_filename_blocks(); unique_ptr offsets(new uint64_t[num_blocks + 1]); complete_pread(fd, offsets.get(), (num_blocks + 1) * sizeof(uint64_t), corpus.offset_for_block(0)); @@ -317,9 +403,7 @@ uint64_t scan_all_docids(const vector &needles, int fd, const Corpus &co for (uint32_t docid = io_docid; docid < last_docid; ++docid) { size_t relative_offset = offsets[docid] - offsets[io_docid]; size_t len = offsets[docid + 1] - offsets[docid]; - matched += scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache, 0, nullptr); - if (limit_matches <= 0) - return matched; + scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache, 0, nullptr, engine, &matched); } } return matched; @@ -386,7 +470,7 @@ void do_search_file(const vector &needles, const char *filename) exit(EXIT_FAILURE); } - steady_clock::time_point start __attribute__((unused)) = steady_clock::now(); + start = steady_clock::now(); if (access("/", R_OK | X_OK)) { // We can't find anything, no need to bother... return; @@ -662,7 +746,7 @@ int main(int argc, char **argv) break; case 'l': case 'n': - limit_matches = atoll(optarg); + limit_matches = limit_left = atoll(optarg); if (limit_matches <= 0) { fprintf(stderr, "Error: limit must be a strictly positive number.\n"); exit(1); -- 2.39.2