From: Steinar H. Gunderson Date: Thu, 15 Oct 2020 20:41:42 +0000 (+0200) Subject: Multithread linear scans. X-Git-Tag: 1.0.4~8 X-Git-Url: https://git.sesse.net/?p=plocate;a=commitdiff_plain;h=75c42d06f8eb513afc1976e82033ffb893b101b8 Multithread linear scans. When we have a scan that we cannot accelerate with trigrams (very short patterns, or regexes), we need to go through all of the file names like mlocate does. This is usually CPU-bound, so fire up threads. We leave one core/hyperthread for the I/O and add a thread for each of the rest (this is probably bad on dualcore, but it's a simple thing that will do for now, and should be fairly safe). The bottleneck now is Serializer. I first tried just putting a mutex on it, which worked fine on eight hyperthreads (ie., four real cores, my laptop), but caused huge contention with 40 (20 cores, my old dual-socket Haswell). Sending data back through per-thread queues seems to work a lot better, but we're still spending a lot of time in Serializer; witness that --count is much faster for such a search. --- diff --git a/meson.build b/meson.build index cbb0cfb..2e68459 100644 --- a/meson.build +++ b/meson.build @@ -6,13 +6,14 @@ add_project_arguments('-DPLOCATE_VERSION="' + meson.project_version() + '"', lan cxx = meson.get_compiler('cpp') uringdep = dependency('liburing', required: false) zstddep = dependency('libzstd') +threaddep = dependency('threads') if not uringdep.found() add_project_arguments('-DWITHOUT_URING', language: 'cpp') endif executable('plocate', ['plocate.cpp', 'io_uring_engine.cpp', 'turbopfor.cpp', 'parse_trigrams.cpp'], - dependencies: [uringdep, zstddep], + dependencies: [uringdep, zstddep, threaddep], install: true, install_mode: ['rwxr-sr-x', 'root', 'mlocate']) executable('plocate-build', 'plocate-build.cpp', diff --git a/plocate.cpp b/plocate.cpp index dfb12d4..e313458 100644 --- a/plocate.cpp +++ b/plocate.cpp @@ -6,8 +6,10 @@ #include "unique_sort.h" #include +#include #include #include +#include #include #include #include @@ -18,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +55,8 @@ int64_t limit_left = numeric_limits::max(); steady_clock::time_point start; ZSTD_DDict *ddict = nullptr; +regex_t compile_regex(const string &needle); + void apply_limit() { if (--limit_left > 0) { @@ -64,10 +70,16 @@ void apply_limit() exit(0); } -class Serializer { +class ResultReceiver { +public: + virtual ~ResultReceiver() = default; + virtual void print(uint64_t seq, uint64_t skip, const string msg) = 0; +}; + +class Serializer : public ResultReceiver { public: ~Serializer() { assert(limit_left <= 0 || pending.empty()); } - void print(uint64_t seq, uint64_t skip, const string msg); + void print(uint64_t seq, uint64_t skip, const string msg) override; private: uint64_t next_seq = 0; @@ -157,10 +169,12 @@ private: }; map> pending_stats; IOUringEngine *engine; + mutex mu; }; void AccessRXCache::check_access(const char *filename, bool allow_async, function cb) { + lock_guard lock(mu); if (engine == nullptr || !engine->get_supports_stat()) { allow_async = false; } @@ -311,8 +325,8 @@ size_t Corpus::get_num_filename_blocks() const } void scan_file_block(const vector &needles, string_view compressed, - AccessRXCache *access_rx_cache, uint64_t seq, Serializer *serializer, - uint64_t *matched) + AccessRXCache *access_rx_cache, uint64_t seq, ResultReceiver *serializer, + atomic *matched) { unsigned long long uncompressed_len = ZSTD_getFrameContentSize(compressed.data(), compressed.size()); if (uncompressed_len == ZSTD_CONTENTSIZE_UNKNOWN || uncompressed_len == ZSTD_CONTENTSIZE_ERROR) { @@ -323,7 +337,7 @@ void scan_file_block(const vector &needles, string_view compressed, string block; block.resize(uncompressed_len + 1); - static ZSTD_DCtx *ctx = ZSTD_createDCtx(); // Reused across calls. + static thread_local ZSTD_DCtx *ctx = ZSTD_createDCtx(); // Reused across calls. size_t err; if (ddict != nullptr) { @@ -385,7 +399,7 @@ size_t scan_docids(const vector &needles, const vector &docids { Serializer docids_in_order; AccessRXCache access_rx_cache(engine); - uint64_t matched = 0; + atomic matched{0}; for (size_t i = 0; i < docids.size(); ++i) { uint32_t docid = docids[i]; corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) { @@ -396,9 +410,48 @@ size_t scan_docids(const vector &needles, const vector &docids return matched; } +struct WorkerThread { + thread t; + + // We use a result queue instead of synchronizing Serializer, + // since a lock on it becomes a huge choke point if there are + // lots of threads. + mutex result_mu; + vector> results; +}; + +class WorkerThreadReceiver : public ResultReceiver { +public: + WorkerThreadReceiver(WorkerThread *wt) : wt(wt) {} + + void print(uint64_t seq, uint64_t skip, const string msg) override + { + lock_guard lock(wt->result_mu); + wt->results.emplace_back(seq, skip, move(msg)); + } + +private: + WorkerThread *wt; +}; + +void deliver_results(WorkerThread *wt, Serializer *serializer) +{ + vector> results; + { + lock_guard lock(wt->result_mu); + results = move(wt->results); + } + for (const auto &result : results) { + serializer->print(get<0>(result), get<1>(result), move(get<2>(result))); + } +} + // We do this sequentially, as it's faster than scattering // a lot of I/O through io_uring and hoping the kernel will -// coalesce it plus readahead for us. +// coalesce it plus readahead for us. Since we assume that +// we will primarily be CPU-bound, we'll be firing up one +// worker thread for each spare core (the last one will +// only be doing I/O). access() is still synchronous. uint64_t scan_all_docids(const vector &needles, int fd, const Corpus &corpus) { { @@ -412,12 +465,59 @@ uint64_t scan_all_docids(const vector &needles, int fd, const Corpus &co } AccessRXCache access_rx_cache(nullptr); - Serializer serializer; // Mostly dummy; handles only the limit. + Serializer serializer; uint32_t num_blocks = corpus.get_num_filename_blocks(); unique_ptr offsets(new uint64_t[num_blocks + 1]); complete_pread(fd, offsets.get(), (num_blocks + 1) * sizeof(uint64_t), corpus.offset_for_block(0)); + atomic matched{0}; + + mutex mu; + condition_variable queue_added, queue_removed; + deque> work_queue; // Under mu. + bool done = false; // Under mu. + + unsigned num_threads = max(sysconf(_SC_NPROCESSORS_ONLN) - 1, 1); + dprintf("Using %u worker threads for linear scan.\n", num_threads); + unique_ptr threads(new WorkerThread[num_threads]); + for (unsigned i = 0; i < num_threads; ++i) { + threads[i].t = thread([&threads, &mu, &queue_added, &queue_removed, &work_queue, &done, &offsets, &needles, &access_rx_cache, &matched, i] { + // regcomp() takes a lock on the regex, so each thread will need its own. + const vector *use_needles = &needles; + vector recompiled_needles; + if (i != 0 && patterns_are_regex) { + recompiled_needles = needles; + for (Needle &needle : recompiled_needles) { + needle.re = compile_regex(needle.str); + } + use_needles = &recompiled_needles; + } + + WorkerThreadReceiver receiver(&threads[i]); + for (;;) { + uint32_t io_docid, last_docid; + string compressed; + + { + unique_lock lock(mu); + queue_added.wait(lock, [&work_queue, &done] { return !work_queue.empty() || done; }); + if (done && work_queue.empty()) { + return; + } + tie(io_docid, last_docid, compressed) = move(work_queue.front()); + work_queue.pop_front(); + queue_removed.notify_all(); + } + + for (uint32_t docid = io_docid; docid < last_docid; ++docid) { + size_t relative_offset = offsets[docid] - offsets[io_docid]; + size_t len = offsets[docid + 1] - offsets[docid]; + scan_file_block(*use_needles, { &compressed[relative_offset], len }, &access_rx_cache, docid, &receiver, &matched); + } + } + }); + } + string compressed; - uint64_t matched = 0; for (uint32_t io_docid = 0; io_docid < num_blocks; io_docid += 32) { uint32_t last_docid = std::min(io_docid + 32, num_blocks); size_t io_len = offsets[last_docid] - offsets[io_docid]; @@ -426,11 +526,27 @@ uint64_t scan_all_docids(const vector &needles, int fd, const Corpus &co } complete_pread(fd, &compressed[0], io_len, offsets[io_docid]); - for (uint32_t docid = io_docid; docid < last_docid; ++docid) { - size_t relative_offset = offsets[docid] - offsets[io_docid]; - size_t len = offsets[docid + 1] - offsets[docid]; - scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache, docid, &serializer, &matched); + { + unique_lock lock(mu); + queue_removed.wait(lock, [&work_queue] { return work_queue.size() < 256; }); // Allow ~2MB of data queued up. + work_queue.emplace_back(io_docid, last_docid, move(compressed)); + queue_added.notify_one(); // Avoid the thundering herd. } + + // Pick up some results, so that we are sure that we won't just overload. + // (Seemingly, going through all of these causes slowness with many threads, + // but taking only one is OK.) + unsigned i = io_docid / 32; + deliver_results(&threads[i % num_threads], &serializer); + } + { + lock_guard lock(mu); + done = true; + queue_added.notify_all(); + } + for (unsigned i = 0; i < num_threads; ++i) { + threads[i].t.join(); + deliver_results(&threads[i], &serializer); } return matched; }